diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/__init__.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/__init__.py deleted file mode 100644 index 8c2a3298a5..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/__init__.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/__init__.py deleted file mode 100644 index 8c2a3298a5..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/__init__.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/__init__.py deleted file mode 100644 index 7928f76681..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/__init__.py +++ /dev/null @@ -1,633 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -''' -A miniature port of the same ghidra.comm.packet library from Java into -Python - -This package is necessary to use any packet formats exported by -ghidra.comm.packet.util.pyexport.GeneratePython. Please see its -javadocs for more information and caveats. - -Most of the classes here correspond very closely with the same-named -class in the Java implementation, which is the reference -implementation. Please see the javadocs to understand how the pieces -all fit. Unless the correspondence is not obvious, no additional -documentation is provided in the Python port. -''' -from __future__ import print_function - -import codecs -import struct -import sys - -import java.lang -import java.util - - -def printargs(func): - ''' - A debugging annotation to print the arguments and return value - ''' - def _pa(*args, **kwargs): - print("before %s: %r, %r" % (func.func_name, args, kwargs)) - result = func(*args, **kwargs) - print("after %s: %r, %r, %r" % (result, func.func_name, args, kwargs)) - return result - return _pa - - -class DefaultPacketFactory: - ''' - The default packet factory - - Corresponds to the class of the same full name in Java. Python's - arrays behave like collections, so there is not a ``new_array()`` - method. - ''' - - def register_types(self, codec): - pass - - def new_collection(self, colCls): - if colCls == java.util.List: - return java.util.ArrayList.new() - else: - return colCls.new() - - def new_packet(self, pktCls): - return pktCls() - - -DEFAULT_FACTORY = DefaultPacketFactory() - - -class SubstitutingPacketFactory(DefaultPacketFactory): - ''' - A packet factory that uses a dictionary of substitutions - - Corresponds loosely to ghidra.comm.packet.AbstractPacketFactory - ''' - - def __init__(self, substs): - self.substs = substs - - def register_types(self, codec): - for v in self.substs.values(): - codec.register_packet_cls(v) - - def new_packet(self, pktCls): - if pktCls in self.substs: - return self.substs[pktCls]() - return DefaultPacketFactory.new_packet(self, pktCls) - - -class PacketDecodeError(Exception): - pass - - -class field: - ''' - Loosely corresponds to ghidra.comm.packet.PacketField - - This must also be given information about the declared type of the - field in Java. Type information is represented using - :py:class:`typedesc`. - - Because Python does not permit annotating fields, methods must - be annotated instead. The method should contain no code, other - than ``pass``, since that code is ignored. The method name - serves as the field name. For example, to declare a field of - Java type ``int``:: - - @field(typedesc(lambda:java.lang.Integer)) - def f1(self): - pass - - :param typedesc tdesc: A descriptor of the field's type - :param default: The default value of the field when constructed - :param fixed: If final, the constant value of the field - ''' - - _nextIdx = 0 - - @classmethod - def _next_idx(cls): - ''' - Retrieve and increments a static counter - - This is used to uniquely number every declared field in order - of declaration. This information is used when encoding and - decoding fields. - - Note that packet extension is not supported, since packets are - expected to be exported. Rather than exporting the type tree, - the exporter should just flatten each packet definition as a - stand-alone containing all of its fields, including those of - its super class(es). - ''' - - ret = cls._nextIdx - cls._nextIdx += 1 - return ret - - def __init__(self, tdesc, default=None, fixed=None): - self._idx = self._next_idx() - self.tdesc = tdesc - self.default = default - self.fixed = fixed - self.annotations = [] - - def __call__(self, func): - ''' - Apply the annotation to a placeholder method - ''' - - self.name = func.__name__ - return self - - def __repr__(self): - if hasattr(self, 'name'): - return '' % (self.tdesc, self.name) - else: - return '' % (id(self),) - - def validate(self, value): - ''' - Check that the field's value conforms to its declared type - ''' - - if self.fixed is not None: - raise AttributeError("Field has fixed value") - self.tdesc.validate(value) - - def build_field_codec(self, pktCls, field, tdesc, codec): - ''' - Build the field codec chain for this field - ''' - - assert self == field - return codec.build_field_codec(pktCls, self, tdesc) - - -class typedesc: - ''' - A description of a field's type - - Because Python is loosely-typed, and packets are not, some - information about the declared type of a field ported from Java - must be carried around. These objects are loosely corresponded to - Java's own java.lang.reflect.Type. ``field_type`` gives the raw - class of the described type. This is usually a Python class having - the same full name as the Java class it corresponds to or holds - the place of. ``targs`` is a dictionary of whose keys are type - variable names from the corresponding Java class. Its values are - other ``typedesc`` objects. For example, a Java ``List`` - would be described using:: - - typedesc(lambda:java.util.List, E=typedesc(lambda:java.lang.Short))) - - Note the use of lambda functions to obtain the actual class - references. This is unfortunate, but required since Java is a - compiled language that permits use before declaration. The - ``typedesc`` is constructed upon declaring the packet class, so - referring to other packet classes needs to be delayed until all - declarations have been interpreted. - - :param field_type: corresponds to the raw class for the type - :param targs: a dictionary of type variable substitutions - ''' - - def __init__(self, field_type, **targs): - self.type = field_type - self.targs = targs - - def __repr__(self): - if len(self.targs) == 0: - return repr(self.type()) - return '%s<%s>' % (self.type(), ', '.join( - "%s=%s" % arg - for arg in self.targs.items() - )) - - def validate(self, value): - ''' - Check that a given value conforms to this type - ''' - - self.type().validate(value, **self.targs) - - def encode_binary(self, buf, value): - ''' - Encode the given value as binary into the given buffer - - Unlike the Java implementation, where encoding and decoding is - performed strictly by a codec, in Python, the serialization of - primitives is performed in the ``typedesc``. - - :param bytearray buf: the buffer to append the encoded data to - :param value: the value to encode - ''' - - # print("encoding into %r, %r as %r" % (buf, value, self.type())) - self.type().encode_binary(buf, value, **self.targs) - - def decode_binary(self, buf, count): - ''' - Decode some values of this type from the given buffer - - See :py:method:`encode_binary` - ''' - - return self.type().decode_binary(buf, count, **self.targs) - - def __eq__(self, that): - return self.type() == that.type() and self.targs == that.targs - - def resolve_targs(self, cls): - ''' - Compute the type substitutions applied to a given super class - - This returns a dictionary whose keys are type parameter names - of the given super class. The values are other ``typedesc`` - objects that were substituted in the course of describing this - type from the perspective of that superclass. For example, the - type description corresponding to ``List`` would - return the dictionary ``{'E': java.lang.Integer}`` for a call - to ``resolve_targs(java.util.Collection)``. - - :param cls: A Python super class of this description's raw type - :return: a dictionary of type parameters substitutions - ''' - - if cls == self.type: - return self.targs - return self.type().resolve_targs(cls, **self.targs) - - -bytearray_t = typedesc( - lambda: java.util.List, E=typedesc(lambda: java.lang.Byte)) - - -class BinaryBuffer: - ''' - A wrapper around Python's ``bytearray`` to behave more like - java.nio.ByteBuffer. - ''' - - def __init__(self, buf=''): - self.buf = bytearray(buf) - self._order = '>' - self.dec = codecs.lookup('UTF-8').incrementaldecoder() - - def __repr__(self): - return 'BinaryBuffer_%x(%r)' % (id(self), self.buf,) - - def order(self, order=None): - ret = self._order - self._order = order or self._order - return ret - - def put_Struct(self, val, s): - ''' - Encode a binary value using a given ``struct`` format string - ''' - - s = struct.Struct(self._order + s) - self.buf.extend(s.pack(val)) - - def get_Struct(self, s): - ''' - Decode a binary value using a given ``struct`` format string - ''' - - s = struct.Struct(self._order + s) - part, self.buf[:] = bytes(self.buf[:s.size]), self.buf[s.size:] - return s.unpack(part)[0] - - def put_Boolean(self, val): - self.buf.append(1 if val else 0) - - def get_Boolean(self): - ret, self.buf[:] = self.buf[0], self.buf[1:] - return ret != 0 - - def put_Character(self, val): - charset = 'UTF-16-BE' if self._order == '>' else 'UTF-16-LE' - self.buf.extend(val.encode(charset)) - - def get_Character(self): - charset = 'UTF-16-BE' if self._order == '>' else 'UTF-16-LE' - ret, self.buf[:] = self.buf[:2].decode(charset), self.buf[2:] - return ret - - def put_String(self, val): - self.buf.extend(val.encode('UTF-8')) - - def get_String(self, count): - if count is None: - ret = str(self.buf) - self.buf[:] = '' - return ret - else: - self.dec.reset() - result = '' - while len(self.buf) > 0 and len(result) < count: - n, self.buf[:] = self.buf[:1], self.buf[1:] - result += self.dec.decode(n) - return result - - -class PacketCodec: - - def __init__(self): - self.chains = {} - self.codecs = {} - self.registry = set() - - def decode_and_store_field(self, pkt, fld, buf, factory): - if fld.fixed is not None: - count = self.measure_count(fld.fixed) - else: - count = None - val = self.decode_field(pkt, fld, buf, count, factory) - if fld.fixed is not None: - if val != fld.fixed: - raise PacketDecodeError( - "Fixed value mismatch for %s (%r != %r)" % ( - fld.name, fld.fixed, val)) - else: - setattr(pkt, fld.name, val) - - def decode_field(self, pkt, fld, buf, count, factory): - codec = self.codecs[pkt.__class__, fld.name] - return codec.decode_field(pkt, buf, count, factory) - - def decode_packet(self, pktCls, data, factory=DEFAULT_FACTORY): - if pktCls not in self.registry: - raise TypeError("Cannot decode unregistered packet type: %r" % ( - pktCls,)) - buf = self.new_decode_buffer(data) - result = self._decode_packet(None, None, buf, pktCls, factory) - if self.measure_decode_remaining(buf) != 0: - raise PacketDecodeError( - "Packet did not consume given buffer: %r" % ( - buf,)) - return result - - def _decode_packet(self, parent, fld, buf, pktCls, factory): - val = factory.new_packet(pktCls) - if val.__class__ not in self.registry: - raise TypeError( - "Decoding %r: constructed type %r is not " + - "registered with this codec" % ( - fld, val.__class__)) - val.parent = parent - for f in val._fields: - self.decode_and_store_field(val, f, buf, factory) - return val - - def encode_field(self, buf, pkt, fld, val): - codec = self.codecs[pkt.__class__, fld.name] - codec.encode_field(buf, pkt, val) - - def encode_packet(self, val, buf=None, fld=None): - pktCls = val.__class__ - if pktCls not in self.registry: - if fld is None: - raise TypeError( - "Cannot encode unregistered packet type: %r" % ( - pktCls,)) - else: - raise TypeError( - "Field %r has value of an unregistered type: %r" % ( - fld, pktCls,)) - if buf is None: - buf = self.new_encode_buffer() - for f in val._fields: - self.encode_field(buf, val, f, f.fixed or val._vals[f.name]) - return self.finish_encode(buf) - - def get_factory_for_annotation(self, pktCls, fld, annot): - if hasattr(annot, 'get_factory'): - return annot.get_factory() - else: - # Ignore the annotation - return None - - def get_field_codec_for_type(self, pktCls, fld, tdesc): - t = tdesc.type() - if issubclass(t, Packet): - codec = self - self.register_packet_cls(t) - - class packet_field_codec: - - def encode_field(self, buf, pkt, val): - codec.encode_packet(val, buf, fld) - - def decode_field(self, pkt, buf, count, factory): - return codec._decode_packet(pkt, fld, buf, t, factory) - return packet_field_codec() - return None - - def inject_chain(self, pktCls, tfld, wrapper, afld, annot): - if pktCls._fields.index(afld) <= pktCls._fields.index(tfld): - raise ValueError( - "Referenced field %r must precede annotated field %r" % ( - tfld, afld)) - self.chains[pktCls, tfld.name].insert(0, wrapper) - - def measure_count(self, obj): - return len(obj) - - def measure_encode_size(self, buf): - return len(buf.buf) - - def register_field(self, pktCls, fld, allFlds): - chain = [self] - for annot in fld.annotations: - factory = self.get_factory_for_annotation(pktCls, fld, annot) - if factory is None: - continue - chain.insert(0, factory) - self.chains[pktCls, fld.name] = chain - - def register_packet_cls(self, pktCls): - if pktCls in self.registry: - return # Avoid infinite recursion - self.registry.add(pktCls) - for fld in pktCls._fields: - self.register_field(pktCls, fld, pktCls._fields) - for fld in reversed(pktCls._fields): - chain = self.chains[pktCls, fld.name] - codec = self.build_chain_for_type( - pktCls, fld, fld.tdesc, self, chain) - self.codecs[pktCls, fld.name] = codec - - def split_buffer(self, buf, at): - ret = buf.buf[:at] - buf.buf[:at] = '' - return BinaryBuffer(ret) - - @classmethod - def build_chain_for_type(cls, pktCls, fld, tdesc, codec, chain): - n = chain.pop(0) - if len(chain) != 0: - temp = n.get_wrapped_field_codec_for_type( - pktCls, fld, tdesc, codec, chain) - else: - temp = n.get_field_codec_for_type(pktCls, fld, tdesc) - if temp is None: - raise TypeError( - "Codec or wrapper %r cannot provide encoder for %r" % ( - n, tdesc)) - return temp - - -def make_property(name, fld): - ''' - A property to control access to each field of a packet - - Used by the packet meta class, each field property ensures that the - assigned value conforms to the field's described type. The actual - values are kept in a separate dictionary that the packet codec can - access directly. - ''' - - @property - def _prop(self): - return self._vals[name] - - @_prop.setter # NOTE: This should have the same name as the getter - def _prop(self, value): - fld.validate(value) - self._vals[name] = value - - # NOTE: If the setter name is different, it's name must go here instead - return _prop - - -class PacketMeta(type): - ''' - A meta class for packets - - This meta class installs a property for each field, replacing the - annotated method. - ''' - - def __new__(self, classname, bases, classdict): - fields = [] - fields_by_name = {} - for name, fld in classdict.items(): - if isinstance(fld, field): - fields.append(fld) - fields_by_name[name] = fld - classdict[name] = make_property(name, fld) - fields.sort(key=lambda f: f._idx) - classdict['_fields'] = tuple(fields) - classdict['_fields_by_name'] = fields_by_name - - cls = type.__new__(self, classname, bases, classdict) - return cls - - -class Packet: - ''' - A packet - - All packet types are subclasses of ``Packet`` just as in the Java - implementation. Please see the :py:class:`field` annotation - regarding how to declare fields. - - Unlike the Java implementation, subclasses of other packet types is - not supported. Instead, each packet type should be based directly - on ``Packet``, and all fields, including those of its super classes - in Java, should be declared. The - ghidra.comm.util.pyexport.GeneratePython utility will do that - already. - ''' - - __metaclass__ = PacketMeta - - def __init__(self, *args, **kwargs): - self._vals = {} - if len(args) != 0: - if len(kwargs) != 0: - raise ValueError( - "%s: Cannot give positional arguments and keyword " + - "arguments" % ( - self.__class__.__name__)) - i = iter(args) - for fld in self._fields: - if fld.fixed is None: - try: - val = i.next() - setattr(self, fld.name, val) - except StopIteration: - raise ValueError( - "Not enough positional arguments given") - try: - i.next() - raise ValueError("Too many positional arguments given") - except StopIteration: - pass - else: - s = set(kwargs) - set(self._fields_by_name) - if len(s) > 0: - raise ValueError( - "%s: Gave invalid keyword arguments: %s" % ( - self.__class__.__name__, ','.join(s))) - for fld in self._fields: - self._vals[fld.name] = fld.fixed or fld.default - if fld.name in kwargs: - setattr(self, fld.name, kwargs[fld.name]) - - @classmethod - def validate(cls, value): - ''' - Check that the given value is an instance of this packet type - ''' - - if not isinstance(value, cls): - raise ValueError("Must be an instance of %s" % cls.__name__) - - def __repr__(self): - return '%s(%s)' % (self.__class__.__name__, ','.join( - '%s=%r' % (fld.name, self._vals[fld.name]) - for fld in self._fields - if fld.fixed is None - )) - - def __cmp__(self, that): - result = cmp(self.__class__, that.__class__) - if result != 0: - return result - for fld in self._fields: - if fld.fixed is not None: - continue - result = cmp(self._vals[fld.name], that._vals[fld.name]) - if result != 0: - return result - return 0 - - def __hash__(self): - parts = [] - parts.append(self.__class__) - for fld in self._fields: - if fld.fixed is not None: - continue - parts.append(self._vals[fld.name]) - return hash(tuple(parts)) diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/annot.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/annot.py deleted file mode 100644 index 12f30faa63..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/annot.py +++ /dev/null @@ -1,518 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -''' -Generic, universal annotations - -These correspond directly to those in the Java implementation. In the -Python implementation, the factories and codecs are directly nested in -their annotation classes. -''' - -from collections import OrderedDict -from copy import copy -import itertools - -from ghidra.comm.packet import (Packet, PacketCodec, PacketDecodeError, - bytearray_t, typedesc) -from ghidra.comm.util import BitmaskSet -import java.util - - -class Annotation: - - def __call__(self, field): - self.field = field - field.annotations.insert(0, self) - return field - - @classmethod - def findOnField(cls, field): - for annot in field.annotations: - if isinstance(annot, cls): - return annot - - -class CountedByField(Annotation): - - def __init__(self, value): - self.by = value - - def get_factory(self): - annot = self - - class InjectedCounter: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class counter_field_codec: - - def encode_field(self, buf, pkt, val): - counted_val = pkt._vals[annot.field.name] - count = codec.measure_count(counted_val) - setattr(pkt, annot.by, count) - n.encode_field(buf, pkt, count) - - def decode_field(self, pkt, buf, count, factory): - return n.decode_field(pkt, buf, count, factory) - return counter_field_codec() - - class Counted: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - codec.inject_chain( - pktCls, pktCls._fields_by_name[annot.by], - InjectedCounter(), fld, annot) - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class counted_field_codec: - - def encode_field(self, buf, pkt, val): - n.encode_field(buf, pkt, val) - - def decode_field(self, pkt, buf, count, factory): - count = pkt._vals[annot.by] - return n.decode_field(pkt, buf, count, factory) - return counted_field_codec() - return Counted() - - -class AbstractSizedBy(Annotation): - - def get_factory(self): - annot = self - - class InjectedSizer: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class sizer_field_codec: - - def encode_field(self, buf, pkt, val): - sized_val = pkt._vals[annot.field.name] - part = codec.new_encode_buffer() - codec.encode_field(part, pkt, annot.field, sized_val) - size = codec.measure_encode_size(part) - annot.set_size(pkt, size) - n.encode_field(buf, pkt, pkt._vals[annot.by]) - - def decode_field(self, pkt, buf, count, factory): - return n.decode_field(pkt, buf, count, factory) - return sizer_field_codec() - - class Sized: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - codec.inject_chain( - pktCls, pktCls._fields_by_name[annot.by], InjectedSizer(), - fld, annot) - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class sized_field_codec: - - def encode_field(self, buf, pkt, val): - n.encode_field(buf, pkt, val) - - def decode_field(self, pkt, buf, count, factory): - size = annot.get_size(pkt) - part = codec.split_buffer(buf, size) - ret = n.decode_field( - pkt, part, count, factory) - if codec.measure_decode_remaining(part) != 0: - # TODO: Consider padding - raise ValueError("Gratuitous data in sized field") - return ret - return sized_field_codec() - return Sized() - - -class SizedByField(AbstractSizedBy): - - def __init__(self, value, adjust=0): - self.by = value - self.adjust = adjust - - def set_size(self, pkt, size): - setattr(pkt, self.by, size - self.adjust) - - def get_size(self, pkt): - return pkt._vals[self.by] + self.adjust - - -class SizedByMethods(AbstractSizedBy): - - def __init__(self, getter, setter='', modifies=''): - self.getter = getter - self.setter = setter - self.by = modifies - - def set_size(self, pkt, size): - getattr(pkt, self.setter)(size) - - def get_size(self, pkt): - return getattr(pkt, self.getter)() - - -class TypedByField_TypeSelect: - - def __init__(self, key, type): # @ReservedAssignment - self.key = key - self.tdesc = type - - -class TypedByField(Annotation): - - def __init__(self, by, map="", types=[]): # @ReservedAssignment - self.by = by - self.map = map - self.types = types - - def find_accepting_type(self, val): - for td in self.fwd_map.values(): - try: - td.validate(val) - return td.type() - except Exception: - pass - return None - - def get_factory(self): - annot = self - - class InjectedTyper: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class typer_field_codec: - - def encode_field(self, buf, pkt, val): - typed_val = pkt._vals[annot.field.name] - try: - idx = annot.inv_map[typed_val.__class__] - except KeyError as e: - cls = annot.find_accepting_type(typed_val) - if cls is None: - raise e - idx = annot.inv_map[cls] - setattr(pkt, annot.by, idx) - n.encode_field(buf, pkt, idx) - - def decode_field(self, pkt, buf, count, factory): - return n.decode_field(pkt, buf, count, factory) - return typer_field_codec() - - class Typed: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - annot.fwd_map = OrderedDict() - if annot.map != '': - annot.fwd_map.update(getattr(pktCls, annot.map)) - for ts in annot.types: - annot.fwd_map[ts.key] = ts.tdesc - annot.inv_map = OrderedDict() - nextMap = OrderedDict() - for k, v in annot.fwd_map.items(): - t = v.type() - annot.inv_map[t] = k - if issubclass(t, Packet): - codec.register_packet_cls(t) - nextMap[t] = PacketCodec.build_chain_for_type( - pktCls, fld, v, codec, copy(chain)) - codec.inject_chain( - pktCls, pktCls._fields_by_name[annot.by], InjectedTyper(), - fld, annot) - - class typed_field_codec: - - def encode_field(self, buf, pkt, val): - try: - n = nextMap[val.__class__] - except KeyError as e: - cls = annot.find_accepting_type(val) - if cls is None: - raise e - n = nextMap[cls] - n.encode_field(buf, pkt, val) - - def decode_field(self, pkt, buf, count, factory): - idx = pkt._vals[annot.by] - tdesc = annot.fwd_map[idx] - n = nextMap[tdesc.type()] - return n.decode_field(pkt, buf, count, factory) - return typed_field_codec() - return Typed() - - -class TypedByLookahead(Annotation): - - def __init__(self, value=[]): - self.types = value - - def get_factory(self): - annot = self - - class Typed: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - nextMap = OrderedDict() - for td in annot.types: - t = td.type() - if issubclass(t, Packet): - codec.register_packet_cls(t) - nextMap[t] = PacketCodec.build_chain_for_type( - pktCls, fld, td, codec, copy(chain)) - - class typed_field_codec: - - def encode_field(self, buf, pkt, val): - nextMap[val.__class__].encode_field(buf, pkt, val) - - def decode_field(self, pkt, buf, count, factory): - errs = OrderedDict() - backup = codec.backup(buf) - for t, n in nextMap.items(): - try: - return n.decode_field(pkt, buf, count, factory) - except Exception as e: - codec.restore(buf, backup) - errs[t] = e.message - raise PacketDecodeError( - "No selectable lookahead type succeeded " + - "for %r: %r" % ( - fld, errs)) - return typed_field_codec() - return Typed() - - -class OptionalField(Annotation): - - def __init__(self): - pass - - def get_factory(self): - class Optional: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class optional_field_codec: - - def encode_field(self, buf, pkt, val): - if val is None: - return - n.encode_field(buf, pkt, val) - - def decode_field(self, pkt, buf, count, factory): - if codec.measure_decode_remaining(buf) == 0: - return None - return n.decode_field(pkt, buf, count, factory) - return optional_field_codec() - return Optional() - - -class RepeatedField(Annotation): - - def __init__(self, container=None, elements=None): - self.container = container - self.elements = elements - - def get_factory(self): - annot = self - - class Repeated: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - tdesc = annot.container or tdesc - if issubclass(tdesc.type(), java.util.Collection): - edesc = annot.elements or tdesc.resolve_targs( - java.util.Collection)['E'] - else: - raise TypeError( - "Can't encode using type %r on repeated field" % ( - tdesc,)) - n = PacketCodec.build_chain_for_type( - pktCls, fld, edesc, codec, chain) - - class repeated_field_codec: - - def encode_field(self, buf, pkt, val): - for e in val: - n.encode_field(buf, pkt, e) - - def decode_field(self, pkt, buf, count, factory): - it = itertools.count() if count is None else xrange( - count) - t = tdesc.type() - result = factory.new_collection(t) - for i in it: # @UnusedVariable - if codec.measure_decode_remaining(buf) == 0: - break - t.add_to( - result, - n.decode_field(pkt, buf, None, factory)) - return result - return repeated_field_codec() - return Repeated() - - -class EncodeChars(Annotation): - - def __init__(self, value): - self.charset = value - - def get_factory(self): - annot = self - - class Encoded: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - if not issubclass(tdesc.type(), java.lang.String): - return None - # Test that the charset exists - ''.encode(self.charset) - n = PacketCodec.build_chain_for_type( - pktCls, fld, bytearray_t, chain) - - class encoded_field_codec: - - def encode_field(self, buf, pkt, val): - n.encode_field(buf, pkt, val.encode(annot.charset)) - - def decode_field(self, pkt, buf, count, factory): - return n.decode_field( - pkt, buf, count, factory).decode(annot.charset) - return encoded_field_codec() - return Encoded() - - -class BitmaskEncoded(Annotation): - - def __init__(self, universe, type=typedesc(lambda: java.lang.Long)): - self.universe = universe - self.type = type - - def get_factory(self): - annot = self - - class Encoded: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, codec, chain): - if not issubclass(tdesc.type(), BitmaskSet): - return None - n = PacketCodec.build_chain_for_type( - pktCls, fld, annot.type, codec, chain) - - class encoded_field_codec: - - def encode_field(self, buf, pkt, val): - n.encode_field(buf, pkt, val.bitmask) - - def decode_field(self, pkt, buf, count, factory): - return BitmaskSet(annot.universe.type(), n.decode_field(pkt, buf, count, factory)) - return encoded_field_codec() - return Encoded() - - -class WithFlag_Mode: - pass - - -WithFlag_Mode.PRESENT = WithFlag_Mode() -WithFlag_Mode.ABSENT = WithFlag_Mode() - - -class WithFlag(Annotation): - def __init__(self, by, flag, mode=WithFlag_Mode.PRESENT): - self.by = by - self.flag = flag - self.mode = mode - - def get_factory(self): - annot = self - - class InjectedFlags: - def __init__(self, universe): - self.universe = universe - self.flag = getattr(universe, annot.flag) - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, codec, chain): - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - injector = self - - class flags_field_codec: - - def encode_field(self, buf, pkt, val): - flagged_val = pkt._vals[annot.field.name] - if val is None: - val = BitmaskSet(injector.universe, 0) - setattr(pkt, annot.by, val) - - if (flagged_val is not None) ^ (annot.mode != WithFlag_Mode.PRESENT): - val.add(injector.flag) - else: - val.discard(injector.flag) - return n.encode_field(buf, pkt, val) - - def decode_field(self, buf, pkt, count, factory): - return n.decode_field(pkt, buf, count, factory) - return flags_field_codec() - - class Flagged: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, codec, chain): - byField = pktCls._fields_by_name[annot.by] - bme = BitmaskEncoded.findOnField(byField) - universe = bme.universe.type() - codec.inject_chain( - pktCls, byField, InjectedFlags(universe), fld, annot) - flag = getattr(universe, annot.flag) - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class flagged_field_codec: - - def encode_field(self, buf, pkt, val): - if val is not None: - n.encode_field(buf, pkt, val) - - def decode_field(self, pkt, buf, count, factory): - flags = pkt._vals[annot.by] - if (flag in flags) ^ (annot.mode != WithFlag_Mode.PRESENT): - return n.decode_field(pkt, buf, count, factory) - else: - return None - return flagged_field_codec() - return Flagged() diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/binary.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/binary.py deleted file mode 100644 index ce12c47742..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/binary.py +++ /dev/null @@ -1,178 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -''' -The binary packet codec and supporting objects - -This also contains annotations particular to the binary packet codec. -Those are implemented via classes nested in the codec itself. -''' - -from __future__ import print_function - -from ghidra.comm.packet import (BinaryBuffer, PacketDecodeError, Packet, - PacketCodec, bytearray_t) - -from .annot import Annotation - - -class BinaryPacketCodec(PacketCodec): - ''' - The binary packet codec - - This encodes to Python ``bytes`` objects, using ``bytearray`` as - the temporary buffers. Also, because the encoding and decoding to - binary is implemented in the types themselves, this codec simply - delegates to those routines. - ''' - - def finish_encode(self, buf): - return bytes(buf.buf) - - def get_factory_for_annotation(self, pktCls, fld, annot): - if isinstance(annot, SequenceTerminated): - class Terminated: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class terminated_field_codec: - - def encode_field(self, buf, pkt, val): - n.encode_field(buf, pkt, val) - if annot.cond != '': - if getattr(pkt, annot.cond) is None: - return - buf.buf.extend(annot.tok) - - def decode_field(self, pkt, buf, count, factory): - i = buf.buf.find(annot.tok) - if i == -1: - if annot.cond != '': - return n.decode_field(pkt, buf, count, - factory) - elif len(buf.buf) == 0: - raise PacketDecodeError("Buffer is empty") - else: - raise PacketDecodeError( - "Missing terminator sequence %r" % ( - annot.tok)) - part = BinaryBuffer(buf.buf[:i]) - buf.buf[:] = buf.buf[i + len(annot.tok):] - ret = n.decode_field(pkt, part, count, factory) - return ret - return terminated_field_codec() - return Terminated() - elif isinstance(annot, ReverseByteOrder): - class Reversed: - - def get_wrapped_field_codec_for_type(self, pktCls, fld, tdesc, - codec, chain): - n = PacketCodec.build_chain_for_type( - pktCls, fld, tdesc, codec, chain) - - class byte_reversed_field_codec: - - def encode_field(self, buf, pkt, val): - orig = buf.order() - buf.order('<' if orig == '>' else '>') - n.encode_field(buf, pkt, val) - buf.order(orig) - - def decode_field(self, pkt, buf, count, factory): - orig = buf.order() - buf.order('<' if orig == '>' else '>') - ret = n.decode_field(pkt, buf, count, factory) - buf.order(orig) - return ret - return byte_reversed_field_codec() - return Reversed() - return PacketCodec.get_factory_for_annotation(self, pktCls, fld, annot) - - def get_field_codec_for_type(self, pktCls, fld, tdesc): - if issubclass(tdesc.type(), Packet): - return PacketCodec.get_field_codec_for_type(self, pktCls, fld, - tdesc) - elif tdesc == bytearray_t: - class bytearray_field_codec: - - def encode_field(self, buf, pkt, val): - buf.buf.extend(val) - - def decode_field(self, pkt, buf, count, factory): - ret = bytearray(buf.buf) - buf.buf[:] = '' - return ret - return bytearray_field_codec() - else: - class field_codec: - - def encode_field(self, buf, pkt, val): - tdesc.encode_binary(buf, val) - - def decode_field(self, pkt, buf, count, factory): - return tdesc.decode_binary(buf, count) - return field_codec() - - def measure_decode_remaining(self, buf): - return len(buf.buf) - - def new_decode_buffer(self, data): - return BinaryBuffer(data) - - def new_encode_buffer(self): - return BinaryBuffer() - - def backup(self, buf): - return bytearray(buf.buf) - - def restore(self, buf, backup): - buf.buf[:] = backup - - -class BinaryLEPacketCodec(BinaryPacketCodec): - - def new_decode_buffer(self, data): - ret = BinaryBuffer(data) - ret.order('<') - return ret - - def new_encode_buffer(self): - ret = BinaryBuffer() - ret.order('<') - return ret - -# Codec-specific annotations below - - -class SequenceTerminated(Annotation): - - def __init__(self, value, cond=''): - self.tok = value - self.cond = cond - - -class NullTerminated(SequenceTerminated): - - def __init__(self, value=1, cond=''): - SequenceTerminated.__init__(self, '\x00' * value, cond) - - -class ReverseByteOrder(Annotation): - - def __init__(self): - pass diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/string.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/string.py deleted file mode 100644 index 93bec5a35c..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/packet/string.py +++ /dev/null @@ -1,63 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -''' -Placeholders for annotations particular to string packet codecs - -The string packet codec is not yet ported to Python, so there is no -sense in exporting string-based packet types. However, the test packet -types are annotated with binary and string codecs in mind, so these -annotations must be defined. -''' -from .annot import Annotation - - -class RegexSeparated(Annotation): - - def __init__(self, exp, tok, optional=True): - self.exp = exp - self.optional = optional - self.tok = tok - - -class RegexTerminated(Annotation): - - def __init__(self, exp, tok, cond=''): - self.exp = exp - self.cond = cond - self.tok = tok - - -class SizeRestricted_PadDirection: - LEFT = 0 - NONE = 1 - RIGHT = 2 - - -class SizeRestricted(Annotation): - - def __init__(self, direction=SizeRestricted_PadDirection.NONE, pad=' ', - value=None, min=0, max=None): # @ReservedAssignment - self.direction = direction - self.pad = pad - self.value = value - self.min = min - self.max = max - - -class WithRadix(Annotation): - - def __init__(self, value): - self.value = value diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/util.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/util.py deleted file mode 100644 index 5eaaf780c1..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/ghidra/comm/util.py +++ /dev/null @@ -1,70 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -from java.lang import (Object, Enum) - - -class BitmaskSet(Object): - def __init__(self, universe, bitmask=0): - if not issubclass(universe, Enum): - raise TypeError("BitmaskSet must have an Enum as the universe") - self.universe = universe - self.bitmask = bitmask - - def add(self, elem): - if not isinstance(elem, self.universe): - raise TypeError("Element must be a member of the universe") - self.bitmask |= elem.mask - - def discard(self, elem): - if not isinstance(elem, self.universe): - return - self.bitmask &= ~elem.mask - - @classmethod - def of(cls, universe, *args): - bs = cls(universe, 0) - for a in args: - bs.add(a) - return bs - - @classmethod - def validate(cls, val, E): - if val is None: - return - elif not isinstance(val, cls): - raise TypeError("Value must be a %s. Got %r" % (cls.__name__, val)) - elif not val.universe == E.type(): - raise TypeError("Must be a set of %s. Got %r" % - (E.type(), val.universe)) - - def __contains__(self, elem): - if not isinstance(elem, self.universe): - return False - else: - return (self.bitmask & elem.mask) != 0 - - def __eq__(self, other): - if isinstance(other, BitmaskSet): - return self.universe == other.universe and self.bitmask == self.bitmask - return self.issubset(other) and other.issubset(self) - - def __iter__(self): - for elem in self.universe.values: - if elem in self: - yield elem - - def __repr__(self): - return '[%s]' % (', '.join(self)) diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/__init__.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/__init__.py deleted file mode 100644 index 8c2a3298a5..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/lang.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/lang.py deleted file mode 100644 index a2aa17681f..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/lang.py +++ /dev/null @@ -1,225 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -''' -Classes corresponding to common Java types - -These are not by any means ports of the corresponding Java types. -Instead, they just populate the same name space so that exporting need -not map things over. They also contain binary encoding and decoding -logic. -''' - - -class ObjectMeta(type): - - def __repr__(self): - return self.__module__ + '.' + self.__name__ - - def __new__(self, classname, bases, classdict): - if len(bases) == 1: - classdict['sup'] = bases[0] - elif classname == 'Object': - pass - else: - raise TypeError("Too many bases for %s: %r" % (classname, bases,)) - return type.__new__(self, classname, bases, classdict) - - -class Object: - __metaclass__ = ObjectMeta - - @classmethod - def validate(cls, val): - pass - - @classmethod - def encode_binary(cls, buf, val): - raise ValueError("Cannot encode as %r" % (cls,)) - - @classmethod - def decode_binary(cls, buf, count): - raise ValueError("Cannot decode as %r" % (cls,)) - - @classmethod - def subst_targs(cls): - return {} - - @classmethod - def resolve_targs(cls, sup, **targs): - cur = cls - while cur != sup: - if cur == Object: - raise ValueError( - "%r is not in the hierarchy of %r" % (sup, cls)) - targs = cur.subst_targs(**targs) - cur = cur.sup - return targs - - -class Number(Object): - - @classmethod - def validate(cls, val): - if val is None: - return - elif not isinstance(val, (int, long, float)): - raise TypeError( - "Value for %s for be numeric. Got %r" % (cls.__name__, val)) - - @classmethod - def encode_binary(cls, buf, val): - buf.put_Struct(val, cls.struct) - - @classmethod - def decode_binary(cls, buf, count): - return buf.get_Struct(cls.struct) - - -class IntegralNumber(Number): - - @classmethod - def validate(cls, val): - if val is None: - return - elif not isinstance(val, (int, long)): - raise TypeError( - "Value for %s must be integral. Got %r" % (cls.__name__, val)) - elif not cls.min <= val < cls.max: - raise ValueError("Value for %s must be in [%s, %s). Got %r" % ( - cls.__name__, cls.min, cls.max, val)) - - -class Boolean(Object): - - @classmethod - def validate(cls, val): - if val is None: - return - elif not isinstance(val, bool): - raise TypeError( - "Value for %s must be boolean. Got %r" % (cls.__name__, val)) - - @classmethod - def encode_binary(cls, buf, val): - buf.put_Boolean(val) - - @classmethod - def decode_binary(cls, buf, count): - return buf.get_Boolean() - - -class Byte(IntegralNumber): - min = -0x80 - max = 0x80 - struct = 'b' - - -class Character(Object): - - @classmethod - def validate(cls, val): - if val is None: - return - elif not isinstance(val, (str, unicode)) or not len(val) == 1: - raise ValueError( - "Value for %r must be a string of length 1. Got %r" % ( - cls.__name__, val)) - - @classmethod - def encode_binary(cls, buf, val): - buf.put_Character(val) - - @classmethod - def decode_binary(cls, buf, count): - return buf.get_Character() - - -class Short(IntegralNumber): - min = -0x8000 - max = 0x8000 - struct = 'h' - - -class Integer(IntegralNumber): - min = -0x80000000 - max = 0x80000000 - struct = 'i' - - -class Long(IntegralNumber): - min = -0x8000000000000000 - max = 0x8000000000000000 - struct = 'q' - - -class Float(Number): - struct = 'f' - - -class Double(Number): - struct = 'd' - - -class Enum(Object): - def __repr__(self): - return self.name - - @classmethod - def validate(cls, val): - if val is None: - return - elif not isinstance(val, cls): - raise ValueError("Value must be from the enumeration %r" % (cls,)) - - @classmethod - def encode_binary(cls, buf, val): - if len(cls.values) < 0x100: - buf.put_Struct(val.ordinal, 'B') - elif len(cls.values) < 0x10000: - buf.put_Struct(val.ordinal, 'H') - elif len(cls.values) < 0x100000000: - buf.put_Struct(val.ordinal, 'I') - else: - buf.put_Struct(val.ordinal, 'Q') - - @classmethod - def decode_binary(cls, buf, count): - if len(cls.values) < 0x100: - return cls.values[buf.get_Struct('B')] - elif len(cls.values) < 0x10000: - return cls.values[buf.get_Struct('H')] - elif len(cls.vaules) < 0x100000000: - return cls.values[buf.get_Struct('I')] - else: - return cls.values[buf.get_Struct('Q')] - - -class String(Object): - - @classmethod - def validate(cls, val): - if val is None: - return - elif not isinstance(val, (str, unicode)): - raise ValueError("Value must be a string") - - @classmethod - def encode_binary(cls, buf, val): - buf.put_String(val) - - @classmethod - def decode_binary(cls, buf, count): - return buf.get_String(count) diff --git a/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/util.py b/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/util.py deleted file mode 100644 index 01805fad28..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/main/py/java/util.py +++ /dev/null @@ -1,70 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -''' -Classes corresponding to common Java collection types - -These are not ports by any means of the corresponding Java types. -Instead, they just populate the same name space so that exporting need -not map things over. I also use them as convenient places to implement -a common interface. -''' - -from java.lang import Object - - -class Collection(Object): - - @classmethod - def validate(cls, val, **targs): - E = cls.resolve_targs(Collection, **targs)['E'] - if not hasattr(val, '__iter__') or not hasattr(val, '__len__'): - raise ValueError( - "Value for %s must behave list a collection " + - "(__iter__ and __len__)") - for e in val: - E.validate(e) - - @classmethod - def subst_targs(cls, E): - return {} - - -class List(Collection): - - @classmethod - def validate(cls, val, **targs): - E = cls.resolve_targs(List, **targs)['E'] - if not hasattr(val, '__getitem__') or not hasattr(val, '__len__'): - raise ValueError( - "Value for %s must behave list a list " + - "(__getitem__ and __len__)") - for e in val: - E.validate(e) - - @classmethod - def subst_targs(cls, E): - return dict(E=E) - - @classmethod - def add_to(cls, l, item): - l.append(item) - - -class ArrayList(List): - - @classmethod - def new(cls): - return list() diff --git a/Ghidra/Debug/Framework-AsyncComm/src/test/py/classes/ghidra/comm/tests/packet/_class_PacketTestClasses_TestMessageFullSpecColField_IntList.py b/Ghidra/Debug/Framework-AsyncComm/src/test/py/classes/ghidra/comm/tests/packet/_class_PacketTestClasses_TestMessageFullSpecColField_IntList.py deleted file mode 100644 index a05696f467..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/test/py/classes/ghidra/comm/tests/packet/_class_PacketTestClasses_TestMessageFullSpecColField_IntList.py +++ /dev/null @@ -1,26 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -from ghidra.comm.packet import typedesc - -import java.lang -import java.util - - -class PacketTestClasses_TestMessageFullSpecColField_IntList(java.util.ArrayList): - - @classmethod - def subst_targs(cls): - return dict(E=typedesc(lambda: java.lang.Integer)) diff --git a/Ghidra/Debug/Framework-AsyncComm/src/test/py/ghidra/comm/test_packet.py b/Ghidra/Debug/Framework-AsyncComm/src/test/py/ghidra/comm/test_packet.py deleted file mode 100644 index e052103fac..0000000000 --- a/Ghidra/Debug/Framework-AsyncComm/src/test/py/ghidra/comm/test_packet.py +++ /dev/null @@ -1,372 +0,0 @@ -## ### -# IP: GHIDRA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -## -from test import test_support -import unittest - -from .packet import Packet, DefaultPacketFactory, DEFAULT_FACTORY -from .packet.binary import BinaryPacketCodec -from .tests.packet import * -from .util import BitmaskSet - - -def update_list(left, right): - for i, (l, r) in enumerate(zip(left, right)): - if isinstance(l, Packet) and isinstance(r, dict): - update_packet(l, r) - elif isinstance(l, list) and isinstance(r, list): - assert len(l) == len(r) - update_list(l, r) - else: - left[i] = r - - -def update_packet(pkt, vals): - for k, v in vals.items(): - cur = getattr(pkt, k) - if isinstance(cur, Packet) and isinstance(v, dict): - update_packet(cur, v) - elif isinstance(cur, list) and isinstance(v, list): - assert len(k) == len(v) - update_list(cur, v) - else: - setattr(pkt, k, v) - - -class BinaryPacketCodecTest(unittest.TestCase): - - factory = DEFAULT_FACTORY - - def build_dec(self): - if isinstance(self.dec_parts, tuple): - return self.pkt_cls(*self.dec_parts) - else: - return self.pkt_cls(**self.dec_parts) - - def setUp(self): - self.codec = BinaryPacketCodec() - self.codec.register_packet_cls(self.pkt_cls) - self.factory.register_types(self.codec) - self.enc = bytes(''.join(self.enc_parts)) - self.pkt = self.build_dec() - if hasattr(self, 'dec_extra'): - self.dec = self.build_dec() - update_packet(self.dec, self.dec_extra) - else: - self.dec = self.pkt - - def test_encode(self): - enc = self.codec.encode_packet(self.pkt) - self.assertEquals(self.enc, enc) - - def test_decode(self): - dec = self.codec.decode_packet(self.pkt_cls, self.enc, self.factory) - self.assertEquals(self.dec, dec) - - -class TestFlatTypes(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageFlatTypes - enc_parts = ('\x01', '\x02', '\x00\x33', '\x00\x04', '\x00\x00\x00\x05', - '\x00\x00\x00\x00\x00\x00\x00\x06', 'seven\x00', - '\x41\x00\x00\x00', '\x40\x22\x00\x00\x00\x00\x00\x00') - dec_parts = (True, 2, '3', 4, 5, 6, 'seven', 8, 9) - - -class TestFlatMixedEndian(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageFlatMixedEndian - enc_parts = ('\x00\x31', '\x32\x00', '\x00\x00\x00\x03', - '\x04\x00\x00\x00', 'five\x00', 'six\x00\x00', - '\x00\x00\x00\x00\x00\x00\x00\x07', - '\x08\x00\x00\x00\x00\x00\x00\x00', - '\x41\x10\x00\x00', '\x00\x00\x20\x41', - '\x40\x26\x00\x00\x00\x00\x00\x00', - '\x00\x00\x00\x00\x00\x00\x28\x40') - dec_parts = ('1', '2', 3, 4, 'five', 'six', 7, 8, 9, 10, 11, 12) - - -class TestSizedString(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageSizedString - enc_parts = ('\x00\x00\x00\x07', 'Testing', '\x00\x00\x00\x04') - dec_parts = dict(str='Testing', more=4) - dec_extra = dict(len=7) - - -class TestMethodSizedString(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageMethodSizedString - enc_parts = ('\x00\x00\x00\x0c', 'Testing2') - dec_parts = dict(str='Testing2') - dec_extra = dict(len=12) - - def setUp(self): - BinaryPacketCodecTest.setUp(self) - - def getLen(self): - return self.len - 4 - - def setLen(self, len): - self.len = len + 4 - - PacketTestClasses_TestMessageMethodSizedString.getLen = getLen - PacketTestClasses_TestMessageMethodSizedString.setLen = setLen - - -class TestCountedShortArray(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageCountedShortArray - enc_parts = ('\x00\x00\x00\x04', '\x00\x00', '\x01\x00', - '\x02\x00', '\x03\x00', '\x00\x00\xbe\xef') - dec_parts = dict(more=0xbeef, arr=[0, 1, 2, 3]) - dec_extra = dict(count=4) - - -class TestDynamicTyped1(BinaryPacketCodecTest): - # NOTE: Not sure how to test/implement the plain TestMessageDynamicTyped - # The type of the field takes Integer or Long, which are indistinguishable - # in Python - pkt_cls = PacketTestClasses_TestMessageDynamicTypedSubs - enc_parts = ('\x00\x00\x00\x01', '\x00\x00\x00\x03') - dec_parts = dict( - sub=PacketTestClasses_TestMessageDynamicTypedSubs_IntTestMessage(3)) - dec_extra = dict(type=1) - - -class TestDynamicTyped2(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageDynamicTypedSubs - enc_parts = ('\x00\x00\x00\x02', '\x00\x00\x00\x00\x00\x00\x00\x04') - dec_parts = dict( - sub=PacketTestClasses_TestMessageDynamicTypedSubs_LongTestMessage(4)) - dec_extra = dict(type=2) - - -class TestUnmeasuredCollection(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageUnmeasuredCollection - enc_parts = ('\x02', '\x00\x07', 'Testing', - '\x01', '\x00\x04', '\x00\x00\x00\x01', - '\x01', '\x00\x04', '\x00\x00\x00\x02') - dec_parts = ([ - PacketTestClasses_TestMessageUnmeasuredCollection_TestElement( - val='Testing'), - PacketTestClasses_TestMessageUnmeasuredCollection_TestElement(val=1), - PacketTestClasses_TestMessageUnmeasuredCollection_TestElement(val=2), - ],) - dec_extra = dict(col=[ - dict(type=2, len=7), - dict(type=1, len=4), - dict(type=1, len=4), - ]) - - -class TestFullSpecColField(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageFullSpecColField - enc_parts = ('\x00\x00\x00\x00', '\x00\x00\x00\x01', - '\x00\x00\x00\x02', '\x00\x00\x00\x03') - dec_parts = ([0, 1, 2, 3],) - - -class TestLookahead1(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageLookahead - enc_parts = ('Int', '\x00\x00\x00\x03') - dec_parts = (PacketTestClasses_TestMessageLookahead_IntTestMessage(3),) - - -class TestLookahead2(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageLookahead - enc_parts = ('Long', '\x00\x00\x00\x00\x00\x00\x00\x04') - dec_parts = (PacketTestClasses_TestMessageLookahead_LongTestMessage(4),) - - -class TestDoubleTermed(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageDoubleTermed - enc_parts = ('\x00\x00\x00\x07', 'Testing\x00') - dec_parts = dict(str='Testing') - dec_extra = dict(len=7) - - -class TestOptional1(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageOptional - enc_parts = ('SomeString',) - dec_parts = dict(f1='SomeString') - - -class TestOptional2(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageOptional - enc_parts = ('SomeString\x00', '\x00\x00\x00\x33') - dec_parts = dict(f1='SomeString', opt=51) - - -class TestEnum1(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageEnum - enc_parts = ('\x01', '\x00\x00\x00\x33') - dec_parts = (PacketTestClasses_TestMessageEnum_TestEnum.ON, 51) - - -class TestEnum2(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageEnum - enc_parts = ('\x02', '\x00\x00\x00\x33') - dec_parts = (PacketTestClasses_TestMessageEnum_TestEnum.ONE, 51) - - -class LongFactory(DefaultPacketFactory): - - def register_types(self, codec): - codec.register_packet_cls(PacketTestClasses_LongTestNumber) - - def new_packet(self, pktCls): - #print("new_packet: %r" % (pktCls)) - if pktCls == PacketTestClasses_AbstractTestNumber: - #print(" Provided LongTestNumber") - return PacketTestClasses_LongTestNumber() - #print(" Provided default") - return DefaultPacketFactory.new_packet(self, pktCls) - - -class TestAbstract1(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageAbstractTestNumber - enc_parts = ( - '\x00\x00\x00\x02', - '\x00\x00\x00\x00\x00\x00\x00\x05', - '\x00\x00\x00\x00\x00\x00\x00\x06', - '\x00\x00\x30\x39', - ) - dec_parts = dict( - numbers=[ - PacketTestClasses_LongTestNumber(5), - PacketTestClasses_LongTestNumber(6), - ], - follows=12345, - ) - - factory = LongFactory() - dec_extra = dict(count=2) - - -class IntFactory(DefaultPacketFactory): - - def register_types(self, codec): - codec.register_packet_cls(PacketTestClasses_IntTestNumber) - - def new_packet(self, pktCls): - if pktCls == PacketTestClasses_AbstractTestNumber: - return PacketTestClasses_IntTestNumber() - return DefaultPacketFactory.new_packet(self, pktCls) - - -class TestAbstract2(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageAbstractTestNumber - enc_parts = ( - '\x00\x00\x00\x02', - '\x00\x00\x00\x05', - '\x00\x00\x00\x06', - '\x00\x00\x30\x39', - ) - dec_parts = dict( - numbers=[ - PacketTestClasses_IntTestNumber(5), - PacketTestClasses_IntTestNumber(6), - ], - follows=12345, - ) - - factory = IntFactory() - dec_extra = dict(count=2) - - -class TestTypedByMap1(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageTypedByMap - enc_parts = ('\x00', '\x00\x00\x00\x05') - dec_parts = dict( - sub=PacketTestClasses_TestMessageTypedByMap_TestASubByMap(5)) - dec_extra = dict(type=PacketTestClasses_TestMessageTypedByMap_TestEnum.A) - - -class TestTypedByMap2(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageTypedByMap - enc_parts = ('\x01', '\x00\x00\x00\x00\x00\x00\x00\x06') - dec_parts = dict( - sub=PacketTestClasses_TestMessageTypedByMap_TestBSubByMap(6)) - dec_extra = dict(type=PacketTestClasses_TestMessageTypedByMap_TestEnum.B) - - -class TestFlags1(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageFlags - enc_parts = ('\x02') - dec_parts = dict() - dec_extra = dict(flags=BitmaskSet.of( - PacketTestClasses_TestMessageFlags_TestFlags, - PacketTestClasses_TestMessageFlags_TestFlags.SECOND - )) - - -class TestFlags2(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageFlags - enc_parts = ('\x03', '\x11\x22\x33\x44\x55\x66\x77\x88') - dec_parts = dict(first=0x1122334455667788L) - dec_extra = dict(flags=BitmaskSet.of( - PacketTestClasses_TestMessageFlags_TestFlags, - PacketTestClasses_TestMessageFlags_TestFlags.FIRST, - PacketTestClasses_TestMessageFlags_TestFlags.SECOND, - )) - - -class TestFlags3(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageFlags - enc_parts = ('\x00', '\x12\x34\x56\x78') - dec_parts = dict(second=0x12345678) - dec_extra = dict(flags=BitmaskSet.of( - PacketTestClasses_TestMessageFlags_TestFlags, - )) - - -class TestFlags4(BinaryPacketCodecTest): - pkt_cls = PacketTestClasses_TestMessageFlags - enc_parts = ('\x01', '\x11\x22\x33\x44\x55\x66\x77\x88', - '\x12\x34\x56\x78') - dec_parts = dict(first=0x1122334455667788L, second=0x12345678) - dec_extra = dict(flags=BitmaskSet.of( - PacketTestClasses_TestMessageFlags_TestFlags, - PacketTestClasses_TestMessageFlags_TestFlags.FIRST, - )) - - -def test_main(): - test_support.run_unittest( - TestFlatTypes, - TestFlatMixedEndian, - TestSizedString, - TestMethodSizedString, - TestCountedShortArray, - TestDynamicTyped1, - TestDynamicTyped2, - TestUnmeasuredCollection, - TestFullSpecColField, - TestLookahead1, - TestLookahead2, - TestDoubleTermed, - TestOptional1, - TestOptional2, - TestEnum1, - TestEnum2, - TestAbstract1, - TestAbstract2, - TestTypedByMap1, - TestTypedByMap2, - TestFlags1, - TestFlags2, - TestFlags3, - TestFlags4, - ) - - -if __name__ == '__main__': - test_main()