edgeserve.message_format

 1import uuid
 2from typing import List, Optional
 3
 4from pulsar.schema import Record, String, Bytes
 5
 6
 7class MessageFormat(Record):
 8    source_id = String()
 9    payload = Bytes(required=True)
10
11
12class BaseBytesCodec:
13    def __init__(self) -> None:
14        pass
15
16    def encode(self, list_of_bytes: List[bytes]) -> bytes:
17        return b''.join(list_of_bytes)
18
19    def decode(self, message_bytes: bytes) -> List[bytes]:
20        raise NotImplementedError
21
22
23class NetworkCodec(BaseBytesCodec):
24    def __init__(self, header_size: int) -> None:
25        super().__init__()
26        self.header_size = header_size
27
28    def encode(self, list_of_bytes: List[bytes]) -> bytes:
29        assert len(list_of_bytes) == 2
30        return b''.join(list_of_bytes)
31
32    def decode(self, message_bytes: bytes) -> List[bytes]:
33        header_bytes = message_bytes[:self.header_size]
34        data_bytes = message_bytes[self.header_size:]
35        return [header_bytes, data_bytes]
36
37
38class GraphCodec:
39    def __init__(self, msg_uuid_size: int = 16, op_from_size: int = 16, header_size: int = 0) -> None:
40        """The GraphCodec is a codec that is designed to encode and decode messages for a graph-based system.
41        The graph-based system is a directed graph where each node is an operator (data stream / model / destination).
42
43        Args:
44            msg_uuid_size (int, optional): The size of the message id. Defaults to 16.
45            op_from_size (int, optional): The size of the upstream operator id. Defaults to 16.
46            header_size (int, optional): The size of the task-specific header. Defaults to 0.
47        """
48        super().__init__()
49        self.msg_uuid_size = msg_uuid_size
50        self.op_from_size = op_from_size
51        self.header_size = header_size
52
53    def encode(self, msg_uuid: uuid.UUID, op_from: str, payload: bytes, header: Optional[bytes] = None) -> bytes:
54        """The encode method should return a single bytes object that represents the message.
55        The length of each field is fixed and determined by the `msg_uuid_size`, `op_from_size`, and `header_size`
56        parameters in the constructor.
57
58        Args:
59            msg_uuid (UUID): The message UUID.
60            op_from (str): The id of the upstream op.
61            payload (bytes): The actual payload.
62            header (Optional[bytes]): An optional header for task-specific bookkeeping.
63
64        Returns:
65            bytes: The encoded message.
66        """
67        if header is None:
68            header = b''
69        assert len(msg_uuid.bytes) == self.msg_uuid_size
70        assert len(op_from) <= self.op_from_size
71        assert len(header) <= self.header_size
72
73        op_from = op_from.encode('utf-8').ljust(self.op_from_size, b'\x00')
74        header = header.ljust(self.header_size, b'\x00')
75        return b''.join([msg_uuid.bytes, op_from, header, payload])
76
77    def decode(self, message_bytes: bytes) -> List[bytes]:
78        """The decode method should return a list of bytes, where the first element is the message id, the second
79        element is the operation from, the third element is the header, and the fourth element is the payload.
80
81        Args:
82            message_bytes (bytes): The message bytes to decode.
83        """
84        msg_uuid_bytes = message_bytes[:self.msg_uuid_size]
85        msg_uuid = uuid.UUID(bytes=msg_uuid_bytes)
86        op_from = message_bytes[self.msg_uuid_size:self.msg_uuid_size + self.op_from_size]
87        header = message_bytes[
88                 self.msg_uuid_size + self.op_from_size:self.msg_uuid_size + self.op_from_size + self.header_size]
89        payload = message_bytes[self.msg_uuid_size + self.op_from_size + self.header_size:]
90
91        if op_from.find(b'\x00') != -1:
92            op_from = op_from[:op_from.find(b'\x00')]
93        if header.find(b'\x00') != -1:
94            header = header[:header.find(b'\x00')]
95        return [msg_uuid, op_from.decode('utf-8'), header, payload]
class MessageFormat(pulsar.schema.definition.Record):
 8class MessageFormat(Record):
 9    source_id = String()
10    payload = Bytes(required=True)
source_id = <pulsar.schema.definition.String object>
payload = <pulsar.schema.definition.Bytes object>
class BaseBytesCodec:
13class BaseBytesCodec:
14    def __init__(self) -> None:
15        pass
16
17    def encode(self, list_of_bytes: List[bytes]) -> bytes:
18        return b''.join(list_of_bytes)
19
20    def decode(self, message_bytes: bytes) -> List[bytes]:
21        raise NotImplementedError
def encode(self, list_of_bytes: List[bytes]) -> bytes:
17    def encode(self, list_of_bytes: List[bytes]) -> bytes:
18        return b''.join(list_of_bytes)
def decode(self, message_bytes: bytes) -> List[bytes]:
20    def decode(self, message_bytes: bytes) -> List[bytes]:
21        raise NotImplementedError
class NetworkCodec(BaseBytesCodec):
24class NetworkCodec(BaseBytesCodec):
25    def __init__(self, header_size: int) -> None:
26        super().__init__()
27        self.header_size = header_size
28
29    def encode(self, list_of_bytes: List[bytes]) -> bytes:
30        assert len(list_of_bytes) == 2
31        return b''.join(list_of_bytes)
32
33    def decode(self, message_bytes: bytes) -> List[bytes]:
34        header_bytes = message_bytes[:self.header_size]
35        data_bytes = message_bytes[self.header_size:]
36        return [header_bytes, data_bytes]
NetworkCodec(header_size: int)
25    def __init__(self, header_size: int) -> None:
26        super().__init__()
27        self.header_size = header_size
header_size
def encode(self, list_of_bytes: List[bytes]) -> bytes:
29    def encode(self, list_of_bytes: List[bytes]) -> bytes:
30        assert len(list_of_bytes) == 2
31        return b''.join(list_of_bytes)
def decode(self, message_bytes: bytes) -> List[bytes]:
33    def decode(self, message_bytes: bytes) -> List[bytes]:
34        header_bytes = message_bytes[:self.header_size]
35        data_bytes = message_bytes[self.header_size:]
36        return [header_bytes, data_bytes]
class GraphCodec:
39class GraphCodec:
40    def __init__(self, msg_uuid_size: int = 16, op_from_size: int = 16, header_size: int = 0) -> None:
41        """The GraphCodec is a codec that is designed to encode and decode messages for a graph-based system.
42        The graph-based system is a directed graph where each node is an operator (data stream / model / destination).
43
44        Args:
45            msg_uuid_size (int, optional): The size of the message id. Defaults to 16.
46            op_from_size (int, optional): The size of the upstream operator id. Defaults to 16.
47            header_size (int, optional): The size of the task-specific header. Defaults to 0.
48        """
49        super().__init__()
50        self.msg_uuid_size = msg_uuid_size
51        self.op_from_size = op_from_size
52        self.header_size = header_size
53
54    def encode(self, msg_uuid: uuid.UUID, op_from: str, payload: bytes, header: Optional[bytes] = None) -> bytes:
55        """The encode method should return a single bytes object that represents the message.
56        The length of each field is fixed and determined by the `msg_uuid_size`, `op_from_size`, and `header_size`
57        parameters in the constructor.
58
59        Args:
60            msg_uuid (UUID): The message UUID.
61            op_from (str): The id of the upstream op.
62            payload (bytes): The actual payload.
63            header (Optional[bytes]): An optional header for task-specific bookkeeping.
64
65        Returns:
66            bytes: The encoded message.
67        """
68        if header is None:
69            header = b''
70        assert len(msg_uuid.bytes) == self.msg_uuid_size
71        assert len(op_from) <= self.op_from_size
72        assert len(header) <= self.header_size
73
74        op_from = op_from.encode('utf-8').ljust(self.op_from_size, b'\x00')
75        header = header.ljust(self.header_size, b'\x00')
76        return b''.join([msg_uuid.bytes, op_from, header, payload])
77
78    def decode(self, message_bytes: bytes) -> List[bytes]:
79        """The decode method should return a list of bytes, where the first element is the message id, the second
80        element is the operation from, the third element is the header, and the fourth element is the payload.
81
82        Args:
83            message_bytes (bytes): The message bytes to decode.
84        """
85        msg_uuid_bytes = message_bytes[:self.msg_uuid_size]
86        msg_uuid = uuid.UUID(bytes=msg_uuid_bytes)
87        op_from = message_bytes[self.msg_uuid_size:self.msg_uuid_size + self.op_from_size]
88        header = message_bytes[
89                 self.msg_uuid_size + self.op_from_size:self.msg_uuid_size + self.op_from_size + self.header_size]
90        payload = message_bytes[self.msg_uuid_size + self.op_from_size + self.header_size:]
91
92        if op_from.find(b'\x00') != -1:
93            op_from = op_from[:op_from.find(b'\x00')]
94        if header.find(b'\x00') != -1:
95            header = header[:header.find(b'\x00')]
96        return [msg_uuid, op_from.decode('utf-8'), header, payload]
GraphCodec( msg_uuid_size: int = 16, op_from_size: int = 16, header_size: int = 0)
40    def __init__(self, msg_uuid_size: int = 16, op_from_size: int = 16, header_size: int = 0) -> None:
41        """The GraphCodec is a codec that is designed to encode and decode messages for a graph-based system.
42        The graph-based system is a directed graph where each node is an operator (data stream / model / destination).
43
44        Args:
45            msg_uuid_size (int, optional): The size of the message id. Defaults to 16.
46            op_from_size (int, optional): The size of the upstream operator id. Defaults to 16.
47            header_size (int, optional): The size of the task-specific header. Defaults to 0.
48        """
49        super().__init__()
50        self.msg_uuid_size = msg_uuid_size
51        self.op_from_size = op_from_size
52        self.header_size = header_size

The GraphCodec is a codec that is designed to encode and decode messages for a graph-based system. The graph-based system is a directed graph where each node is an operator (data stream / model / destination).

Arguments:
  • msg_uuid_size (int, optional): The size of the message id. Defaults to 16.
  • op_from_size (int, optional): The size of the upstream operator id. Defaults to 16.
  • header_size (int, optional): The size of the task-specific header. Defaults to 0.
msg_uuid_size
op_from_size
header_size
def encode( self, msg_uuid: uuid.UUID, op_from: str, payload: bytes, header: Optional[bytes] = None) -> bytes:
54    def encode(self, msg_uuid: uuid.UUID, op_from: str, payload: bytes, header: Optional[bytes] = None) -> bytes:
55        """The encode method should return a single bytes object that represents the message.
56        The length of each field is fixed and determined by the `msg_uuid_size`, `op_from_size`, and `header_size`
57        parameters in the constructor.
58
59        Args:
60            msg_uuid (UUID): The message UUID.
61            op_from (str): The id of the upstream op.
62            payload (bytes): The actual payload.
63            header (Optional[bytes]): An optional header for task-specific bookkeeping.
64
65        Returns:
66            bytes: The encoded message.
67        """
68        if header is None:
69            header = b''
70        assert len(msg_uuid.bytes) == self.msg_uuid_size
71        assert len(op_from) <= self.op_from_size
72        assert len(header) <= self.header_size
73
74        op_from = op_from.encode('utf-8').ljust(self.op_from_size, b'\x00')
75        header = header.ljust(self.header_size, b'\x00')
76        return b''.join([msg_uuid.bytes, op_from, header, payload])

The encode method should return a single bytes object that represents the message. The length of each field is fixed and determined by the msg_uuid_size, op_from_size, and header_size parameters in the constructor.

Arguments:
  • msg_uuid (UUID): The message UUID.
  • op_from (str): The id of the upstream op.
  • payload (bytes): The actual payload.
  • header (Optional[bytes]): An optional header for task-specific bookkeeping.
Returns:

bytes: The encoded message.

def decode(self, message_bytes: bytes) -> List[bytes]:
78    def decode(self, message_bytes: bytes) -> List[bytes]:
79        """The decode method should return a list of bytes, where the first element is the message id, the second
80        element is the operation from, the third element is the header, and the fourth element is the payload.
81
82        Args:
83            message_bytes (bytes): The message bytes to decode.
84        """
85        msg_uuid_bytes = message_bytes[:self.msg_uuid_size]
86        msg_uuid = uuid.UUID(bytes=msg_uuid_bytes)
87        op_from = message_bytes[self.msg_uuid_size:self.msg_uuid_size + self.op_from_size]
88        header = message_bytes[
89                 self.msg_uuid_size + self.op_from_size:self.msg_uuid_size + self.op_from_size + self.header_size]
90        payload = message_bytes[self.msg_uuid_size + self.op_from_size + self.header_size:]
91
92        if op_from.find(b'\x00') != -1:
93            op_from = op_from[:op_from.find(b'\x00')]
94        if header.find(b'\x00') != -1:
95            header = header[:header.find(b'\x00')]
96        return [msg_uuid, op_from.decode('utf-8'), header, payload]

The decode method should return a list of bytes, where the first element is the message id, the second element is the operation from, the third element is the header, and the fourth element is the payload.

Arguments:
  • message_bytes (bytes): The message bytes to decode.