edgeserve.message_format
1import uuid 2from typing import List, Optional 3 4from pulsar.schema import Record, String, Bytes 5 6 7class MessageFormat(Record): 8 source_id = String() 9 payload = Bytes(required=True) 10 11 12class BaseBytesCodec: 13 def __init__(self) -> None: 14 pass 15 16 def encode(self, list_of_bytes: List[bytes]) -> bytes: 17 return b''.join(list_of_bytes) 18 19 def decode(self, message_bytes: bytes) -> List[bytes]: 20 raise NotImplementedError 21 22 23class NetworkCodec(BaseBytesCodec): 24 def __init__(self, header_size: int) -> None: 25 super().__init__() 26 self.header_size = header_size 27 28 def encode(self, list_of_bytes: List[bytes]) -> bytes: 29 assert len(list_of_bytes) == 2 30 return b''.join(list_of_bytes) 31 32 def decode(self, message_bytes: bytes) -> List[bytes]: 33 header_bytes = message_bytes[:self.header_size] 34 data_bytes = message_bytes[self.header_size:] 35 return [header_bytes, data_bytes] 36 37 38class GraphCodec: 39 def __init__(self, msg_uuid_size: int = 16, op_from_size: int = 16, header_size: int = 0) -> None: 40 """The GraphCodec is a codec that is designed to encode and decode messages for a graph-based system. 41 The graph-based system is a directed graph where each node is an operator (data stream / model / destination). 42 43 Args: 44 msg_uuid_size (int, optional): The size of the message id. Defaults to 16. 45 op_from_size (int, optional): The size of the upstream operator id. Defaults to 16. 46 header_size (int, optional): The size of the task-specific header. Defaults to 0. 47 """ 48 super().__init__() 49 self.msg_uuid_size = msg_uuid_size 50 self.op_from_size = op_from_size 51 self.header_size = header_size 52 53 def encode(self, msg_uuid: uuid.UUID, op_from: str, payload: bytes, header: Optional[bytes] = None) -> bytes: 54 """The encode method should return a single bytes object that represents the message. 55 The length of each field is fixed and determined by the `msg_uuid_size`, `op_from_size`, and `header_size` 56 parameters in the constructor. 57 58 Args: 59 msg_uuid (UUID): The message UUID. 60 op_from (str): The id of the upstream op. 61 payload (bytes): The actual payload. 62 header (Optional[bytes]): An optional header for task-specific bookkeeping. 63 64 Returns: 65 bytes: The encoded message. 66 """ 67 if header is None: 68 header = b'' 69 assert len(msg_uuid.bytes) == self.msg_uuid_size 70 assert len(op_from) <= self.op_from_size 71 assert len(header) <= self.header_size 72 73 op_from = op_from.encode('utf-8').ljust(self.op_from_size, b'\x00') 74 header = header.ljust(self.header_size, b'\x00') 75 return b''.join([msg_uuid.bytes, op_from, header, payload]) 76 77 def decode(self, message_bytes: bytes) -> List[bytes]: 78 """The decode method should return a list of bytes, where the first element is the message id, the second 79 element is the operation from, the third element is the header, and the fourth element is the payload. 80 81 Args: 82 message_bytes (bytes): The message bytes to decode. 83 """ 84 msg_uuid_bytes = message_bytes[:self.msg_uuid_size] 85 msg_uuid = uuid.UUID(bytes=msg_uuid_bytes) 86 op_from = message_bytes[self.msg_uuid_size:self.msg_uuid_size + self.op_from_size] 87 header = message_bytes[ 88 self.msg_uuid_size + self.op_from_size:self.msg_uuid_size + self.op_from_size + self.header_size] 89 payload = message_bytes[self.msg_uuid_size + self.op_from_size + self.header_size:] 90 91 if op_from.find(b'\x00') != -1: 92 op_from = op_from[:op_from.find(b'\x00')] 93 if header.find(b'\x00') != -1: 94 header = header[:header.find(b'\x00')] 95 return [msg_uuid, op_from.decode('utf-8'), header, payload]
class
MessageFormat(pulsar.schema.definition.Record):
class
BaseBytesCodec:
13class BaseBytesCodec: 14 def __init__(self) -> None: 15 pass 16 17 def encode(self, list_of_bytes: List[bytes]) -> bytes: 18 return b''.join(list_of_bytes) 19 20 def decode(self, message_bytes: bytes) -> List[bytes]: 21 raise NotImplementedError
24class NetworkCodec(BaseBytesCodec): 25 def __init__(self, header_size: int) -> None: 26 super().__init__() 27 self.header_size = header_size 28 29 def encode(self, list_of_bytes: List[bytes]) -> bytes: 30 assert len(list_of_bytes) == 2 31 return b''.join(list_of_bytes) 32 33 def decode(self, message_bytes: bytes) -> List[bytes]: 34 header_bytes = message_bytes[:self.header_size] 35 data_bytes = message_bytes[self.header_size:] 36 return [header_bytes, data_bytes]
class
GraphCodec:
39class GraphCodec: 40 def __init__(self, msg_uuid_size: int = 16, op_from_size: int = 16, header_size: int = 0) -> None: 41 """The GraphCodec is a codec that is designed to encode and decode messages for a graph-based system. 42 The graph-based system is a directed graph where each node is an operator (data stream / model / destination). 43 44 Args: 45 msg_uuid_size (int, optional): The size of the message id. Defaults to 16. 46 op_from_size (int, optional): The size of the upstream operator id. Defaults to 16. 47 header_size (int, optional): The size of the task-specific header. Defaults to 0. 48 """ 49 super().__init__() 50 self.msg_uuid_size = msg_uuid_size 51 self.op_from_size = op_from_size 52 self.header_size = header_size 53 54 def encode(self, msg_uuid: uuid.UUID, op_from: str, payload: bytes, header: Optional[bytes] = None) -> bytes: 55 """The encode method should return a single bytes object that represents the message. 56 The length of each field is fixed and determined by the `msg_uuid_size`, `op_from_size`, and `header_size` 57 parameters in the constructor. 58 59 Args: 60 msg_uuid (UUID): The message UUID. 61 op_from (str): The id of the upstream op. 62 payload (bytes): The actual payload. 63 header (Optional[bytes]): An optional header for task-specific bookkeeping. 64 65 Returns: 66 bytes: The encoded message. 67 """ 68 if header is None: 69 header = b'' 70 assert len(msg_uuid.bytes) == self.msg_uuid_size 71 assert len(op_from) <= self.op_from_size 72 assert len(header) <= self.header_size 73 74 op_from = op_from.encode('utf-8').ljust(self.op_from_size, b'\x00') 75 header = header.ljust(self.header_size, b'\x00') 76 return b''.join([msg_uuid.bytes, op_from, header, payload]) 77 78 def decode(self, message_bytes: bytes) -> List[bytes]: 79 """The decode method should return a list of bytes, where the first element is the message id, the second 80 element is the operation from, the third element is the header, and the fourth element is the payload. 81 82 Args: 83 message_bytes (bytes): The message bytes to decode. 84 """ 85 msg_uuid_bytes = message_bytes[:self.msg_uuid_size] 86 msg_uuid = uuid.UUID(bytes=msg_uuid_bytes) 87 op_from = message_bytes[self.msg_uuid_size:self.msg_uuid_size + self.op_from_size] 88 header = message_bytes[ 89 self.msg_uuid_size + self.op_from_size:self.msg_uuid_size + self.op_from_size + self.header_size] 90 payload = message_bytes[self.msg_uuid_size + self.op_from_size + self.header_size:] 91 92 if op_from.find(b'\x00') != -1: 93 op_from = op_from[:op_from.find(b'\x00')] 94 if header.find(b'\x00') != -1: 95 header = header[:header.find(b'\x00')] 96 return [msg_uuid, op_from.decode('utf-8'), header, payload]
GraphCodec( msg_uuid_size: int = 16, op_from_size: int = 16, header_size: int = 0)
40 def __init__(self, msg_uuid_size: int = 16, op_from_size: int = 16, header_size: int = 0) -> None: 41 """The GraphCodec is a codec that is designed to encode and decode messages for a graph-based system. 42 The graph-based system is a directed graph where each node is an operator (data stream / model / destination). 43 44 Args: 45 msg_uuid_size (int, optional): The size of the message id. Defaults to 16. 46 op_from_size (int, optional): The size of the upstream operator id. Defaults to 16. 47 header_size (int, optional): The size of the task-specific header. Defaults to 0. 48 """ 49 super().__init__() 50 self.msg_uuid_size = msg_uuid_size 51 self.op_from_size = op_from_size 52 self.header_size = header_size
The GraphCodec is a codec that is designed to encode and decode messages for a graph-based system. The graph-based system is a directed graph where each node is an operator (data stream / model / destination).
Arguments:
- msg_uuid_size (int, optional): The size of the message id. Defaults to 16.
- op_from_size (int, optional): The size of the upstream operator id. Defaults to 16.
- header_size (int, optional): The size of the task-specific header. Defaults to 0.
def
encode( self, msg_uuid: uuid.UUID, op_from: str, payload: bytes, header: Optional[bytes] = None) -> bytes:
54 def encode(self, msg_uuid: uuid.UUID, op_from: str, payload: bytes, header: Optional[bytes] = None) -> bytes: 55 """The encode method should return a single bytes object that represents the message. 56 The length of each field is fixed and determined by the `msg_uuid_size`, `op_from_size`, and `header_size` 57 parameters in the constructor. 58 59 Args: 60 msg_uuid (UUID): The message UUID. 61 op_from (str): The id of the upstream op. 62 payload (bytes): The actual payload. 63 header (Optional[bytes]): An optional header for task-specific bookkeeping. 64 65 Returns: 66 bytes: The encoded message. 67 """ 68 if header is None: 69 header = b'' 70 assert len(msg_uuid.bytes) == self.msg_uuid_size 71 assert len(op_from) <= self.op_from_size 72 assert len(header) <= self.header_size 73 74 op_from = op_from.encode('utf-8').ljust(self.op_from_size, b'\x00') 75 header = header.ljust(self.header_size, b'\x00') 76 return b''.join([msg_uuid.bytes, op_from, header, payload])
The encode method should return a single bytes object that represents the message.
The length of each field is fixed and determined by the msg_uuid_size
, op_from_size
, and header_size
parameters in the constructor.
Arguments:
- msg_uuid (UUID): The message UUID.
- op_from (str): The id of the upstream op.
- payload (bytes): The actual payload.
- header (Optional[bytes]): An optional header for task-specific bookkeeping.
Returns:
bytes: The encoded message.
def
decode(self, message_bytes: bytes) -> List[bytes]:
78 def decode(self, message_bytes: bytes) -> List[bytes]: 79 """The decode method should return a list of bytes, where the first element is the message id, the second 80 element is the operation from, the third element is the header, and the fourth element is the payload. 81 82 Args: 83 message_bytes (bytes): The message bytes to decode. 84 """ 85 msg_uuid_bytes = message_bytes[:self.msg_uuid_size] 86 msg_uuid = uuid.UUID(bytes=msg_uuid_bytes) 87 op_from = message_bytes[self.msg_uuid_size:self.msg_uuid_size + self.op_from_size] 88 header = message_bytes[ 89 self.msg_uuid_size + self.op_from_size:self.msg_uuid_size + self.op_from_size + self.header_size] 90 payload = message_bytes[self.msg_uuid_size + self.op_from_size + self.header_size:] 91 92 if op_from.find(b'\x00') != -1: 93 op_from = op_from[:op_from.find(b'\x00')] 94 if header.find(b'\x00') != -1: 95 header = header[:header.find(b'\x00')] 96 return [msg_uuid, op_from.decode('utf-8'), header, payload]
The decode method should return a list of bytes, where the first element is the message id, the second element is the operation from, the third element is the header, and the fourth element is the payload.
Arguments:
- message_bytes (bytes): The message bytes to decode.