Cleaning out old AsyncComm Python stuff

This commit is contained in:
Dan
2021-04-01 14:01:13 -04:00
parent 6f77869ee1
commit 57b69005c7
12 changed files with 0 additions and 2200 deletions
@@ -1,15 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
@@ -1,15 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -1,178 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
'''
The binary packet codec and supporting objects
This also contains annotations particular to the binary packet codec.
Those are implemented via classes nested in the codec itself.
'''
from __future__ import print_function
from ghidra.comm.packet import (BinaryBuffer, PacketDecodeError, Packet,
PacketCodec, bytearray_t)
from .annot import Annotation
class BinaryPacketCodec(PacketCodec):
'''
The binary packet codec
This encodes to Python ``bytes`` objects, using ``bytearray`` as
the temporary buffers. Also, because the encoding and decoding to
binary is implemented in the types themselves, this codec simply
delegates to those routines.
'''
def finish_encode(self, buf):
return bytes(buf.buf)
def get_factory_for_annotation(self, pktCls, fld, annot):
if isinstance(annot, SequenceTerminated):
class Terminated:
def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc,
codec, chain):
n = PacketCodec.build_chain_for_type(
pktCls, fld, tdesc, codec, chain)
class terminated_field_codec:
def encode_field(self, buf, pkt, val):
n.encode_field(buf, pkt, val)
if annot.cond != '':
if getattr(pkt, annot.cond) is None:
return
buf.buf.extend(annot.tok)
def decode_field(self, pkt, buf, count, factory):
i = buf.buf.find(annot.tok)
if i == -1:
if annot.cond != '':
return n.decode_field(pkt, buf, count,
factory)
elif len(buf.buf) == 0:
raise PacketDecodeError("Buffer is empty")
else:
raise PacketDecodeError(
"Missing terminator sequence %r" % (
annot.tok))
part = BinaryBuffer(buf.buf[:i])
buf.buf[:] = buf.buf[i + len(annot.tok):]
ret = n.decode_field(pkt, part, count, factory)
return ret
return terminated_field_codec()
return Terminated()
elif isinstance(annot, ReverseByteOrder):
class Reversed:
def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc,
codec, chain):
n = PacketCodec.build_chain_for_type(
pktCls, fld, tdesc, codec, chain)
class byte_reversed_field_codec:
def encode_field(self, buf, pkt, val):
orig = buf.order()
buf.order('<' if orig == '>' else '>')
n.encode_field(buf, pkt, val)
buf.order(orig)
def decode_field(self, pkt, buf, count, factory):
orig = buf.order()
buf.order('<' if orig == '>' else '>')
ret = n.decode_field(pkt, buf, count, factory)
buf.order(orig)
return ret
return byte_reversed_field_codec()
return Reversed()
return PacketCodec.get_factory_for_annotation(self, pktCls, fld, annot)
def get_field_codec_for_type(self, pktCls, fld, tdesc):
if issubclass(tdesc.type(), Packet):
return PacketCodec.get_field_codec_for_type(self, pktCls, fld,
tdesc)
elif tdesc == bytearray_t:
class bytearray_field_codec:
def encode_field(self, buf, pkt, val):
buf.buf.extend(val)
def decode_field(self, pkt, buf, count, factory):
ret = bytearray(buf.buf)
buf.buf[:] = ''
return ret
return bytearray_field_codec()
else:
class field_codec:
def encode_field(self, buf, pkt, val):
tdesc.encode_binary(buf, val)
def decode_field(self, pkt, buf, count, factory):
return tdesc.decode_binary(buf, count)
return field_codec()
def measure_decode_remaining(self, buf):
return len(buf.buf)
def new_decode_buffer(self, data):
return BinaryBuffer(data)
def new_encode_buffer(self):
return BinaryBuffer()
def backup(self, buf):
return bytearray(buf.buf)
def restore(self, buf, backup):
buf.buf[:] = backup
class BinaryLEPacketCodec(BinaryPacketCodec):
def new_decode_buffer(self, data):
ret = BinaryBuffer(data)
ret.order('<')
return ret
def new_encode_buffer(self):
ret = BinaryBuffer()
ret.order('<')
return ret
# Codec-specific annotations below
class SequenceTerminated(Annotation):
def __init__(self, value, cond=''):
self.tok = value
self.cond = cond
class NullTerminated(SequenceTerminated):
def __init__(self, value=1, cond=''):
SequenceTerminated.__init__(self, '\x00' * value, cond)
class ReverseByteOrder(Annotation):
def __init__(self):
pass
@@ -1,63 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
'''
Placeholders for annotations particular to string packet codecs
The string packet codec is not yet ported to Python, so there is no
sense in exporting string-based packet types. However, the test packet
types are annotated with binary and string codecs in mind, so these
annotations must be defined.
'''
from .annot import Annotation
class RegexSeparated(Annotation):
def __init__(self, exp, tok, optional=True):
self.exp = exp
self.optional = optional
self.tok = tok
class RegexTerminated(Annotation):
def __init__(self, exp, tok, cond=''):
self.exp = exp
self.cond = cond
self.tok = tok
class SizeRestricted_PadDirection:
LEFT = 0
NONE = 1
RIGHT = 2
class SizeRestricted(Annotation):
def __init__(self, direction=SizeRestricted_PadDirection.NONE, pad=' ',
value=None, min=0, max=None): # @ReservedAssignment
self.direction = direction
self.pad = pad
self.value = value
self.min = min
self.max = max
class WithRadix(Annotation):
def __init__(self, value):
self.value = value
@@ -1,70 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from java.lang import (Object, Enum)
class BitmaskSet(Object):
def __init__(self, universe, bitmask=0):
if not issubclass(universe, Enum):
raise TypeError("BitmaskSet must have an Enum as the universe")
self.universe = universe
self.bitmask = bitmask
def add(self, elem):
if not isinstance(elem, self.universe):
raise TypeError("Element must be a member of the universe")
self.bitmask |= elem.mask
def discard(self, elem):
if not isinstance(elem, self.universe):
return
self.bitmask &= ~elem.mask
@classmethod
def of(cls, universe, *args):
bs = cls(universe, 0)
for a in args:
bs.add(a)
return bs
@classmethod
def validate(cls, val, E):
if val is None:
return
elif not isinstance(val, cls):
raise TypeError("Value must be a %s. Got %r" % (cls.__name__, val))
elif not val.universe == E.type():
raise TypeError("Must be a set of %s. Got %r" %
(E.type(), val.universe))
def __contains__(self, elem):
if not isinstance(elem, self.universe):
return False
else:
return (self.bitmask & elem.mask) != 0
def __eq__(self, other):
if isinstance(other, BitmaskSet):
return self.universe == other.universe and self.bitmask == self.bitmask
return self.issubset(other) and other.issubset(self)
def __iter__(self):
for elem in self.universe.values:
if elem in self:
yield elem
def __repr__(self):
return '[%s]' % (', '.join(self))
@@ -1,15 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
@@ -1,225 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
'''
Classes corresponding to common Java types
These are not by any means ports of the corresponding Java types.
Instead, they just populate the same name space so that exporting need
not map things over. They also contain binary encoding and decoding
logic.
'''
class ObjectMeta(type):
def __repr__(self):
return self.__module__ + '.' + self.__name__
def __new__(self, classname, bases, classdict):
if len(bases) == 1:
classdict['sup'] = bases[0]
elif classname == 'Object':
pass
else:
raise TypeError("Too many bases for %s: %r" % (classname, bases,))
return type.__new__(self, classname, bases, classdict)
class Object:
__metaclass__ = ObjectMeta
@classmethod
def validate(cls, val):
pass
@classmethod
def encode_binary(cls, buf, val):
raise ValueError("Cannot encode as %r" % (cls,))
@classmethod
def decode_binary(cls, buf, count):
raise ValueError("Cannot decode as %r" % (cls,))
@classmethod
def subst_targs(cls):
return {}
@classmethod
def resolve_targs(cls, sup, **targs):
cur = cls
while cur != sup:
if cur == Object:
raise ValueError(
"%r is not in the hierarchy of %r" % (sup, cls))
targs = cur.subst_targs(**targs)
cur = cur.sup
return targs
class Number(Object):
@classmethod
def validate(cls, val):
if val is None:
return
elif not isinstance(val, (int, long, float)):
raise TypeError(
"Value for %s for be numeric. Got %r" % (cls.__name__, val))
@classmethod
def encode_binary(cls, buf, val):
buf.put_Struct(val, cls.struct)
@classmethod
def decode_binary(cls, buf, count):
return buf.get_Struct(cls.struct)
class IntegralNumber(Number):
@classmethod
def validate(cls, val):
if val is None:
return
elif not isinstance(val, (int, long)):
raise TypeError(
"Value for %s must be integral. Got %r" % (cls.__name__, val))
elif not cls.min <= val < cls.max:
raise ValueError("Value for %s must be in [%s, %s). Got %r" % (
cls.__name__, cls.min, cls.max, val))
class Boolean(Object):
@classmethod
def validate(cls, val):
if val is None:
return
elif not isinstance(val, bool):
raise TypeError(
"Value for %s must be boolean. Got %r" % (cls.__name__, val))
@classmethod
def encode_binary(cls, buf, val):
buf.put_Boolean(val)
@classmethod
def decode_binary(cls, buf, count):
return buf.get_Boolean()
class Byte(IntegralNumber):
min = -0x80
max = 0x80
struct = 'b'
class Character(Object):
@classmethod
def validate(cls, val):
if val is None:
return
elif not isinstance(val, (str, unicode)) or not len(val) == 1:
raise ValueError(
"Value for %r must be a string of length 1. Got %r" % (
cls.__name__, val))
@classmethod
def encode_binary(cls, buf, val):
buf.put_Character(val)
@classmethod
def decode_binary(cls, buf, count):
return buf.get_Character()
class Short(IntegralNumber):
min = -0x8000
max = 0x8000
struct = 'h'
class Integer(IntegralNumber):
min = -0x80000000
max = 0x80000000
struct = 'i'
class Long(IntegralNumber):
min = -0x8000000000000000
max = 0x8000000000000000
struct = 'q'
class Float(Number):
struct = 'f'
class Double(Number):
struct = 'd'
class Enum(Object):
def __repr__(self):
return self.name
@classmethod
def validate(cls, val):
if val is None:
return
elif not isinstance(val, cls):
raise ValueError("Value must be from the enumeration %r" % (cls,))
@classmethod
def encode_binary(cls, buf, val):
if len(cls.values) < 0x100:
buf.put_Struct(val.ordinal, 'B')
elif len(cls.values) < 0x10000:
buf.put_Struct(val.ordinal, 'H')
elif len(cls.values) < 0x100000000:
buf.put_Struct(val.ordinal, 'I')
else:
buf.put_Struct(val.ordinal, 'Q')
@classmethod
def decode_binary(cls, buf, count):
if len(cls.values) < 0x100:
return cls.values[buf.get_Struct('B')]
elif len(cls.values) < 0x10000:
return cls.values[buf.get_Struct('H')]
elif len(cls.vaules) < 0x100000000:
return cls.values[buf.get_Struct('I')]
else:
return cls.values[buf.get_Struct('Q')]
class String(Object):
@classmethod
def validate(cls, val):
if val is None:
return
elif not isinstance(val, (str, unicode)):
raise ValueError("Value must be a string")
@classmethod
def encode_binary(cls, buf, val):
buf.put_String(val)
@classmethod
def decode_binary(cls, buf, count):
return buf.get_String(count)
@@ -1,70 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
'''
Classes corresponding to common Java collection types
These are not ports by any means of the corresponding Java types.
Instead, they just populate the same name space so that exporting need
not map things over. I also use them as convenient places to implement
a common interface.
'''
from java.lang import Object
class Collection(Object):
@classmethod
def validate(cls, val, **targs):
E = cls.resolve_targs(Collection, **targs)['E']
if not hasattr(val, '__iter__') or not hasattr(val, '__len__'):
raise ValueError(
"Value for %s must behave list a collection " +
"(__iter__ and __len__)")
for e in val:
E.validate(e)
@classmethod
def subst_targs(cls, E):
return {}
class List(Collection):
@classmethod
def validate(cls, val, **targs):
E = cls.resolve_targs(List, **targs)['E']
if not hasattr(val, '__getitem__') or not hasattr(val, '__len__'):
raise ValueError(
"Value for %s must behave list a list " +
"(__getitem__ and __len__)")
for e in val:
E.validate(e)
@classmethod
def subst_targs(cls, E):
return dict(E=E)
@classmethod
def add_to(cls, l, item):
l.append(item)
class ArrayList(List):
@classmethod
def new(cls):
return list()
@@ -1,26 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from ghidra.comm.packet import typedesc
import java.lang
import java.util
class PacketTestClasses_TestMessageFullSpecColField_IntList(java.util.ArrayList):
@classmethod
def subst_targs(cls):
return dict(E=typedesc(lambda: java.lang.Integer))
@@ -1,372 +0,0 @@
## ###
# IP: GHIDRA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
from test import test_support
import unittest
from .packet import Packet, DefaultPacketFactory, DEFAULT_FACTORY
from .packet.binary import BinaryPacketCodec
from .tests.packet import *
from .util import BitmaskSet
def update_list(left, right):
for i, (l, r) in enumerate(zip(left, right)):
if isinstance(l, Packet) and isinstance(r, dict):
update_packet(l, r)
elif isinstance(l, list) and isinstance(r, list):
assert len(l) == len(r)
update_list(l, r)
else:
left[i] = r
def update_packet(pkt, vals):
for k, v in vals.items():
cur = getattr(pkt, k)
if isinstance(cur, Packet) and isinstance(v, dict):
update_packet(cur, v)
elif isinstance(cur, list) and isinstance(v, list):
assert len(k) == len(v)
update_list(cur, v)
else:
setattr(pkt, k, v)
class BinaryPacketCodecTest(unittest.TestCase):
factory = DEFAULT_FACTORY
def build_dec(self):
if isinstance(self.dec_parts, tuple):
return self.pkt_cls(*self.dec_parts)
else:
return self.pkt_cls(**self.dec_parts)
def setUp(self):
self.codec = BinaryPacketCodec()
self.codec.register_packet_cls(self.pkt_cls)
self.factory.register_types(self.codec)
self.enc = bytes(''.join(self.enc_parts))
self.pkt = self.build_dec()
if hasattr(self, 'dec_extra'):
self.dec = self.build_dec()
update_packet(self.dec, self.dec_extra)
else:
self.dec = self.pkt
def test_encode(self):
enc = self.codec.encode_packet(self.pkt)
self.assertEquals(self.enc, enc)
def test_decode(self):
dec = self.codec.decode_packet(self.pkt_cls, self.enc, self.factory)
self.assertEquals(self.dec, dec)
class TestFlatTypes(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageFlatTypes
enc_parts = ('\x01', '\x02', '\x00\x33', '\x00\x04', '\x00\x00\x00\x05',
'\x00\x00\x00\x00\x00\x00\x00\x06', 'seven\x00',
'\x41\x00\x00\x00', '\x40\x22\x00\x00\x00\x00\x00\x00')
dec_parts = (True, 2, '3', 4, 5, 6, 'seven', 8, 9)
class TestFlatMixedEndian(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageFlatMixedEndian
enc_parts = ('\x00\x31', '\x32\x00', '\x00\x00\x00\x03',
'\x04\x00\x00\x00', 'five\x00', 'six\x00\x00',
'\x00\x00\x00\x00\x00\x00\x00\x07',
'\x08\x00\x00\x00\x00\x00\x00\x00',
'\x41\x10\x00\x00', '\x00\x00\x20\x41',
'\x40\x26\x00\x00\x00\x00\x00\x00',
'\x00\x00\x00\x00\x00\x00\x28\x40')
dec_parts = ('1', '2', 3, 4, 'five', 'six', 7, 8, 9, 10, 11, 12)
class TestSizedString(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageSizedString
enc_parts = ('\x00\x00\x00\x07', 'Testing', '\x00\x00\x00\x04')
dec_parts = dict(str='Testing', more=4)
dec_extra = dict(len=7)
class TestMethodSizedString(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageMethodSizedString
enc_parts = ('\x00\x00\x00\x0c', 'Testing2')
dec_parts = dict(str='Testing2')
dec_extra = dict(len=12)
def setUp(self):
BinaryPacketCodecTest.setUp(self)
def getLen(self):
return self.len - 4
def setLen(self, len):
self.len = len + 4
PacketTestClasses_TestMessageMethodSizedString.getLen = getLen
PacketTestClasses_TestMessageMethodSizedString.setLen = setLen
class TestCountedShortArray(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageCountedShortArray
enc_parts = ('\x00\x00\x00\x04', '\x00\x00', '\x01\x00',
'\x02\x00', '\x03\x00', '\x00\x00\xbe\xef')
dec_parts = dict(more=0xbeef, arr=[0, 1, 2, 3])
dec_extra = dict(count=4)
class TestDynamicTyped1(BinaryPacketCodecTest):
# NOTE: Not sure how to test/implement the plain TestMessageDynamicTyped
# The type of the field takes Integer or Long, which are indistinguishable
# in Python
pkt_cls = PacketTestClasses_TestMessageDynamicTypedSubs
enc_parts = ('\x00\x00\x00\x01', '\x00\x00\x00\x03')
dec_parts = dict(
sub=PacketTestClasses_TestMessageDynamicTypedSubs_IntTestMessage(3))
dec_extra = dict(type=1)
class TestDynamicTyped2(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageDynamicTypedSubs
enc_parts = ('\x00\x00\x00\x02', '\x00\x00\x00\x00\x00\x00\x00\x04')
dec_parts = dict(
sub=PacketTestClasses_TestMessageDynamicTypedSubs_LongTestMessage(4))
dec_extra = dict(type=2)
class TestUnmeasuredCollection(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageUnmeasuredCollection
enc_parts = ('\x02', '\x00\x07', 'Testing',
'\x01', '\x00\x04', '\x00\x00\x00\x01',
'\x01', '\x00\x04', '\x00\x00\x00\x02')
dec_parts = ([
PacketTestClasses_TestMessageUnmeasuredCollection_TestElement(
val='Testing'),
PacketTestClasses_TestMessageUnmeasuredCollection_TestElement(val=1),
PacketTestClasses_TestMessageUnmeasuredCollection_TestElement(val=2),
],)
dec_extra = dict(col=[
dict(type=2, len=7),
dict(type=1, len=4),
dict(type=1, len=4),
])
class TestFullSpecColField(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageFullSpecColField
enc_parts = ('\x00\x00\x00\x00', '\x00\x00\x00\x01',
'\x00\x00\x00\x02', '\x00\x00\x00\x03')
dec_parts = ([0, 1, 2, 3],)
class TestLookahead1(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageLookahead
enc_parts = ('Int', '\x00\x00\x00\x03')
dec_parts = (PacketTestClasses_TestMessageLookahead_IntTestMessage(3),)
class TestLookahead2(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageLookahead
enc_parts = ('Long', '\x00\x00\x00\x00\x00\x00\x00\x04')
dec_parts = (PacketTestClasses_TestMessageLookahead_LongTestMessage(4),)
class TestDoubleTermed(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageDoubleTermed
enc_parts = ('\x00\x00\x00\x07', 'Testing\x00')
dec_parts = dict(str='Testing')
dec_extra = dict(len=7)
class TestOptional1(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageOptional
enc_parts = ('SomeString',)
dec_parts = dict(f1='SomeString')
class TestOptional2(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageOptional
enc_parts = ('SomeString\x00', '\x00\x00\x00\x33')
dec_parts = dict(f1='SomeString', opt=51)
class TestEnum1(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageEnum
enc_parts = ('\x01', '\x00\x00\x00\x33')
dec_parts = (PacketTestClasses_TestMessageEnum_TestEnum.ON, 51)
class TestEnum2(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageEnum
enc_parts = ('\x02', '\x00\x00\x00\x33')
dec_parts = (PacketTestClasses_TestMessageEnum_TestEnum.ONE, 51)
class LongFactory(DefaultPacketFactory):
def register_types(self, codec):
codec.register_packet_cls(PacketTestClasses_LongTestNumber)
def new_packet(self, pktCls):
#print("new_packet: %r" % (pktCls))
if pktCls == PacketTestClasses_AbstractTestNumber:
#print(" Provided LongTestNumber")
return PacketTestClasses_LongTestNumber()
#print(" Provided default")
return DefaultPacketFactory.new_packet(self, pktCls)
class TestAbstract1(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageAbstractTestNumber
enc_parts = (
'\x00\x00\x00\x02',
'\x00\x00\x00\x00\x00\x00\x00\x05',
'\x00\x00\x00\x00\x00\x00\x00\x06',
'\x00\x00\x30\x39',
)
dec_parts = dict(
numbers=[
PacketTestClasses_LongTestNumber(5),
PacketTestClasses_LongTestNumber(6),
],
follows=12345,
)
factory = LongFactory()
dec_extra = dict(count=2)
class IntFactory(DefaultPacketFactory):
def register_types(self, codec):
codec.register_packet_cls(PacketTestClasses_IntTestNumber)
def new_packet(self, pktCls):
if pktCls == PacketTestClasses_AbstractTestNumber:
return PacketTestClasses_IntTestNumber()
return DefaultPacketFactory.new_packet(self, pktCls)
class TestAbstract2(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageAbstractTestNumber
enc_parts = (
'\x00\x00\x00\x02',
'\x00\x00\x00\x05',
'\x00\x00\x00\x06',
'\x00\x00\x30\x39',
)
dec_parts = dict(
numbers=[
PacketTestClasses_IntTestNumber(5),
PacketTestClasses_IntTestNumber(6),
],
follows=12345,
)
factory = IntFactory()
dec_extra = dict(count=2)
class TestTypedByMap1(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageTypedByMap
enc_parts = ('\x00', '\x00\x00\x00\x05')
dec_parts = dict(
sub=PacketTestClasses_TestMessageTypedByMap_TestASubByMap(5))
dec_extra = dict(type=PacketTestClasses_TestMessageTypedByMap_TestEnum.A)
class TestTypedByMap2(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageTypedByMap
enc_parts = ('\x01', '\x00\x00\x00\x00\x00\x00\x00\x06')
dec_parts = dict(
sub=PacketTestClasses_TestMessageTypedByMap_TestBSubByMap(6))
dec_extra = dict(type=PacketTestClasses_TestMessageTypedByMap_TestEnum.B)
class TestFlags1(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageFlags
enc_parts = ('\x02')
dec_parts = dict()
dec_extra = dict(flags=BitmaskSet.of(
PacketTestClasses_TestMessageFlags_TestFlags,
PacketTestClasses_TestMessageFlags_TestFlags.SECOND
))
class TestFlags2(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageFlags
enc_parts = ('\x03', '\x11\x22\x33\x44\x55\x66\x77\x88')
dec_parts = dict(first=0x1122334455667788L)
dec_extra = dict(flags=BitmaskSet.of(
PacketTestClasses_TestMessageFlags_TestFlags,
PacketTestClasses_TestMessageFlags_TestFlags.FIRST,
PacketTestClasses_TestMessageFlags_TestFlags.SECOND,
))
class TestFlags3(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageFlags
enc_parts = ('\x00', '\x12\x34\x56\x78')
dec_parts = dict(second=0x12345678)
dec_extra = dict(flags=BitmaskSet.of(
PacketTestClasses_TestMessageFlags_TestFlags,
))
class TestFlags4(BinaryPacketCodecTest):
pkt_cls = PacketTestClasses_TestMessageFlags
enc_parts = ('\x01', '\x11\x22\x33\x44\x55\x66\x77\x88',
'\x12\x34\x56\x78')
dec_parts = dict(first=0x1122334455667788L, second=0x12345678)
dec_extra = dict(flags=BitmaskSet.of(
PacketTestClasses_TestMessageFlags_TestFlags,
PacketTestClasses_TestMessageFlags_TestFlags.FIRST,
))
def test_main():
test_support.run_unittest(
TestFlatTypes,
TestFlatMixedEndian,
TestSizedString,
TestMethodSizedString,
TestCountedShortArray,
TestDynamicTyped1,
TestDynamicTyped2,
TestUnmeasuredCollection,
TestFullSpecColField,
TestLookahead1,
TestLookahead2,
TestDoubleTermed,
TestOptional1,
TestOptional2,
TestEnum1,
TestEnum2,
TestAbstract1,
TestAbstract2,
TestTypedByMap1,
TestTypedByMap2,
TestFlags1,
TestFlags2,
TestFlags3,
TestFlags4,
)
if __name__ == '__main__':
test_main()