edgeserve.compute

  1import os
  2import time
  3import uuid
  4
  5import pulsar
  6import pickle
  7import pathlib
  8from _pulsar import InitialPosition, ConsumerType
  9from inspect import signature
 10
 11from edgeserve.util import ftp_fetch, local_to_global_path
 12from edgeserve.message_format import GraphCodec
 13
 14
 15class Compute:
 16    def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False,
 17                 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst',
 18                 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None,
 19                 drop_if_older_than_ms=None, log_verbose=False):
 20        """Initializes a compute operator.
 21
 22        Args:
 23            task: The actual task to be performed.
 24            pulsar_node: Address of Apache Pulsar server.
 25            worker_id: The unique identifier of this operator.
 26            gate_in: The gating function applied to input stream.
 27            gate_out: The gating function applied to output stream.
 28            ftp: When set to `True`, lazy routing mode is enabled.
 29            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
 30            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
 31            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
 32            topic_in: Pulsar topic of the input data stream.
 33            topic_out: Pulsar topic of the output data stream.
 34            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
 35            no_overlap: When set to `True`, we ensure that every message is at most processed once.
 36            min_interval_ms: The minimum time interval between two consecutive runs.
 37            log_path: Path to store the replay log. When set to `None`, log is disabled.
 38            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
 39            drop_if_older_than_ms: When set to a value, messages older than this value relative to current time are dropped.
 40            log_verbose: When set to `False`, we only log in the replay log when a join operation is performed.
 41        """
 42        self.client = pulsar.Client(pulsar_node)
 43        self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema())
 44        self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub',
 45                                              consumer_type=ConsumerType.Shared,
 46                                              schema=pulsar.schema.BytesSchema(),
 47                                              initial_position=InitialPosition.Earliest)
 48        self.task = task
 49        self.worker_id = worker_id
 50        self.gate_in = (lambda x: x) if gate_in is None else gate_in
 51        self.gate_out = (lambda x: x) if gate_out is None else gate_out
 52        self.ftp = ftp  # consider changing this name to ftp_in
 53        self.local_ftp_path = local_ftp_path
 54        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
 55        self.ftp_delete = ftp_delete
 56        self.latest_msg = dict()
 57        self.latest_msg_in_uuid = dict()
 58        self.latest_msg_publish_time_ms = dict()
 59        self.latest_msg_consumed_time_ms = dict()
 60        self.max_time_diff_ms = max_time_diff_ms
 61        self.no_overlap = no_overlap
 62        self.min_interval_ms = min_interval_ms  # prediction frequency
 63        self.last_run_start_ms = 0
 64        self.last_run_finish_ms = 0
 65        self.log_path = log_path
 66        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
 67        self.drop_if_older_than_ms = drop_if_older_than_ms
 68        self.log_verbose = log_verbose
 69        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
 70
 71    def __enter__(self):
 72        return self
 73
 74    def __exit__(self, exc_type, exc_val, exc_tb):
 75        self.client.close()
 76
 77    def _try_task(self):
 78        # Avoid running too frequently for expensive tasks
 79        if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms:
 80            return False, None, None
 81
 82        if len(self.latest_msg) < len(signature(self.task).parameters):
 83            return False, None, None
 84
 85        earliest = None
 86        latest = None
 87        for source_id in self.latest_msg.keys():
 88            if earliest is None or self.latest_msg_publish_time_ms[source_id] < earliest:
 89                earliest = self.latest_msg_publish_time_ms[source_id]
 90            if latest is None or self.latest_msg_publish_time_ms[source_id] > latest:
 91                latest = self.latest_msg_publish_time_ms[source_id]
 92            if latest - earliest > self.max_time_diff_ms:
 93                return False, None, None
 94        self.last_run_start_ms = time.time() * 1000
 95
 96        # Lazy data routing: only fetch data from FTP counterpart when we actually need it.
 97        if self.ftp:
 98            for source_id in self.latest_msg.keys():
 99                if 'ftp://' in self.latest_msg[source_id]:
100                    local_file_path = ftp_fetch(self.latest_msg[source_id], self.local_ftp_path, memory=self.ftp_memory, delete=self.ftp_delete)
101                    self.latest_msg[source_id] = local_file_path
102
103        output = self.task(**self.latest_msg)
104        self.last_run_finish_ms = time.time() * 1000
105        msg_out_uuid = uuid.uuid4()
106
107        # For now, use the completion timestamp as the filename of output FTP file
108        if self.ftp and not self.ftp_memory and output:
109            ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output')
110            pathlib.Path(ftp_output_dir).mkdir(exist_ok=True)
111            with open(os.path.join(ftp_output_dir, str(self.last_run_finish_ms)) + '.ftp', 'w') as f:
112                f.write(output)
113            output = os.path.join(ftp_output_dir, str(self.last_run_finish_ms)) + '.ftp'
114
115        # If no_overlap, reset latest_msg and latest_msg_time_ms so a message won't be processed twice.
116        if self.no_overlap:
117            self.latest_msg = dict()
118            self.latest_msg_in_uuid = dict()
119            self.latest_msg_publish_time_ms = dict()
120            self.latest_msg_consumed_time_ms = dict()
121
122        return True, msg_out_uuid, output
123
124    def __iter__(self):
125        return self
126
127    def __next__(self):
128        msg_in = self.consumer.receive()
129
130        if self.drop_if_older_than_ms is not None:
131            assert isinstance(self.drop_if_older_than_ms, int)
132            if msg_in.publish_timestamp() + self.drop_if_older_than_ms < time.time() * 1000:
133                # incoming message is too old, skip it.
134                self.consumer.acknowledge(msg_in)
135                return None
136
137        msg_in_uuid, op_from, _, payload = self.graph_codec.decode(msg_in.value())
138
139        data = self.gate_in(payload)  # path to file if ftp, raw data in bytes otherwise
140        if data is not None:
141            self.latest_msg_publish_time_ms[op_from] = msg_in.publish_timestamp()
142            self.latest_msg_consumed_time_ms[op_from] = time.time() * 1000
143            self.latest_msg[op_from] = data
144            self.latest_msg_in_uuid[op_from] = msg_in_uuid
145
146        if self.ftp and not self.ftp_memory:  # FTP file mode
147            # download the file from FTP server and then delete the file from server
148            if not data.startswith('ftp://'):
149                return None
150            self.latest_msg[op_from] = data
151            ret, msg_out_uuid, output = self._try_task()
152            if ret and output:
153                global_file_path = local_to_global_path(output, self.local_ftp_path)
154                output = self.gate_out(global_file_path)
155        else:
156            if self.ftp:  # FTP memory mode
157                self.latest_msg[op_from] = data
158            # memory mode
159            ret, msg_out_uuid, output = self._try_task()
160            if ret and output:
161                output = self.gate_out(output)
162
163        # If log_path is not None, we write aggregation decisions to a log file.
164        if self.log_path and os.path.isdir(self.log_path):
165            if ret or self.log_verbose:
166                replay_log = {'msg_in_uuid': self.latest_msg_in_uuid,
167                              'msg_in_payload': self.latest_msg,
168                              'msg_out_uuid': msg_out_uuid,
169                              'msg_out_payload': output,
170                              'msg_publish_time_ms': self.latest_msg_publish_time_ms,
171                              'msg_consumed_time_ms': self.latest_msg_consumed_time_ms,
172                              'start_compute_time_ms': self.last_run_start_ms,
173                              'finish_compute_time_ms': self.last_run_finish_ms,
174                              'worker_id': self.worker_id,
175                              'is_join_performed': ret}
176                with open(os.path.join(self.log_path, self.log_filename + '.compute'), 'ab') as f:
177                    pickle.dump(replay_log, f)
178
179        if output:
180            if self.log_path and os.path.isdir(self.log_path):
181                with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f:
182                    pickle.dump(output, f)
183            if type(output) == str:
184                output = output.encode('utf-8')
185            msg_out = self.graph_codec.encode(msg_uuid=msg_out_uuid, op_from=self.worker_id, payload=output)
186            self.producer.send(msg_out)
187            self.consumer.acknowledge(msg_in)
188            return output
189        else:
190            # No output is given, no need to materialize
191            self.consumer.acknowledge(msg_in)
192            return None
class Compute:
 16class Compute:
 17    def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False,
 18                 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst',
 19                 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None,
 20                 drop_if_older_than_ms=None, log_verbose=False):
 21        """Initializes a compute operator.
 22
 23        Args:
 24            task: The actual task to be performed.
 25            pulsar_node: Address of Apache Pulsar server.
 26            worker_id: The unique identifier of this operator.
 27            gate_in: The gating function applied to input stream.
 28            gate_out: The gating function applied to output stream.
 29            ftp: When set to `True`, lazy routing mode is enabled.
 30            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
 31            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
 32            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
 33            topic_in: Pulsar topic of the input data stream.
 34            topic_out: Pulsar topic of the output data stream.
 35            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
 36            no_overlap: When set to `True`, we ensure that every message is at most processed once.
 37            min_interval_ms: The minimum time interval between two consecutive runs.
 38            log_path: Path to store the replay log. When set to `None`, log is disabled.
 39            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
 40            drop_if_older_than_ms: When set to a value, messages older than this value relative to current time are dropped.
 41            log_verbose: When set to `False`, we only log in the replay log when a join operation is performed.
 42        """
 43        self.client = pulsar.Client(pulsar_node)
 44        self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema())
 45        self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub',
 46                                              consumer_type=ConsumerType.Shared,
 47                                              schema=pulsar.schema.BytesSchema(),
 48                                              initial_position=InitialPosition.Earliest)
 49        self.task = task
 50        self.worker_id = worker_id
 51        self.gate_in = (lambda x: x) if gate_in is None else gate_in
 52        self.gate_out = (lambda x: x) if gate_out is None else gate_out
 53        self.ftp = ftp  # consider changing this name to ftp_in
 54        self.local_ftp_path = local_ftp_path
 55        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
 56        self.ftp_delete = ftp_delete
 57        self.latest_msg = dict()
 58        self.latest_msg_in_uuid = dict()
 59        self.latest_msg_publish_time_ms = dict()
 60        self.latest_msg_consumed_time_ms = dict()
 61        self.max_time_diff_ms = max_time_diff_ms
 62        self.no_overlap = no_overlap
 63        self.min_interval_ms = min_interval_ms  # prediction frequency
 64        self.last_run_start_ms = 0
 65        self.last_run_finish_ms = 0
 66        self.log_path = log_path
 67        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
 68        self.drop_if_older_than_ms = drop_if_older_than_ms
 69        self.log_verbose = log_verbose
 70        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
 71
 72    def __enter__(self):
 73        return self
 74
 75    def __exit__(self, exc_type, exc_val, exc_tb):
 76        self.client.close()
 77
 78    def _try_task(self):
 79        # Avoid running too frequently for expensive tasks
 80        if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms:
 81            return False, None, None
 82
 83        if len(self.latest_msg) < len(signature(self.task).parameters):
 84            return False, None, None
 85
 86        earliest = None
 87        latest = None
 88        for source_id in self.latest_msg.keys():
 89            if earliest is None or self.latest_msg_publish_time_ms[source_id] < earliest:
 90                earliest = self.latest_msg_publish_time_ms[source_id]
 91            if latest is None or self.latest_msg_publish_time_ms[source_id] > latest:
 92                latest = self.latest_msg_publish_time_ms[source_id]
 93            if latest - earliest > self.max_time_diff_ms:
 94                return False, None, None
 95        self.last_run_start_ms = time.time() * 1000
 96
 97        # Lazy data routing: only fetch data from FTP counterpart when we actually need it.
 98        if self.ftp:
 99            for source_id in self.latest_msg.keys():
100                if 'ftp://' in self.latest_msg[source_id]:
101                    local_file_path = ftp_fetch(self.latest_msg[source_id], self.local_ftp_path, memory=self.ftp_memory, delete=self.ftp_delete)
102                    self.latest_msg[source_id] = local_file_path
103
104        output = self.task(**self.latest_msg)
105        self.last_run_finish_ms = time.time() * 1000
106        msg_out_uuid = uuid.uuid4()
107
108        # For now, use the completion timestamp as the filename of output FTP file
109        if self.ftp and not self.ftp_memory and output:
110            ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output')
111            pathlib.Path(ftp_output_dir).mkdir(exist_ok=True)
112            with open(os.path.join(ftp_output_dir, str(self.last_run_finish_ms)) + '.ftp', 'w') as f:
113                f.write(output)
114            output = os.path.join(ftp_output_dir, str(self.last_run_finish_ms)) + '.ftp'
115
116        # If no_overlap, reset latest_msg and latest_msg_time_ms so a message won't be processed twice.
117        if self.no_overlap:
118            self.latest_msg = dict()
119            self.latest_msg_in_uuid = dict()
120            self.latest_msg_publish_time_ms = dict()
121            self.latest_msg_consumed_time_ms = dict()
122
123        return True, msg_out_uuid, output
124
125    def __iter__(self):
126        return self
127
128    def __next__(self):
129        msg_in = self.consumer.receive()
130
131        if self.drop_if_older_than_ms is not None:
132            assert isinstance(self.drop_if_older_than_ms, int)
133            if msg_in.publish_timestamp() + self.drop_if_older_than_ms < time.time() * 1000:
134                # incoming message is too old, skip it.
135                self.consumer.acknowledge(msg_in)
136                return None
137
138        msg_in_uuid, op_from, _, payload = self.graph_codec.decode(msg_in.value())
139
140        data = self.gate_in(payload)  # path to file if ftp, raw data in bytes otherwise
141        if data is not None:
142            self.latest_msg_publish_time_ms[op_from] = msg_in.publish_timestamp()
143            self.latest_msg_consumed_time_ms[op_from] = time.time() * 1000
144            self.latest_msg[op_from] = data
145            self.latest_msg_in_uuid[op_from] = msg_in_uuid
146
147        if self.ftp and not self.ftp_memory:  # FTP file mode
148            # download the file from FTP server and then delete the file from server
149            if not data.startswith('ftp://'):
150                return None
151            self.latest_msg[op_from] = data
152            ret, msg_out_uuid, output = self._try_task()
153            if ret and output:
154                global_file_path = local_to_global_path(output, self.local_ftp_path)
155                output = self.gate_out(global_file_path)
156        else:
157            if self.ftp:  # FTP memory mode
158                self.latest_msg[op_from] = data
159            # memory mode
160            ret, msg_out_uuid, output = self._try_task()
161            if ret and output:
162                output = self.gate_out(output)
163
164        # If log_path is not None, we write aggregation decisions to a log file.
165        if self.log_path and os.path.isdir(self.log_path):
166            if ret or self.log_verbose:
167                replay_log = {'msg_in_uuid': self.latest_msg_in_uuid,
168                              'msg_in_payload': self.latest_msg,
169                              'msg_out_uuid': msg_out_uuid,
170                              'msg_out_payload': output,
171                              'msg_publish_time_ms': self.latest_msg_publish_time_ms,
172                              'msg_consumed_time_ms': self.latest_msg_consumed_time_ms,
173                              'start_compute_time_ms': self.last_run_start_ms,
174                              'finish_compute_time_ms': self.last_run_finish_ms,
175                              'worker_id': self.worker_id,
176                              'is_join_performed': ret}
177                with open(os.path.join(self.log_path, self.log_filename + '.compute'), 'ab') as f:
178                    pickle.dump(replay_log, f)
179
180        if output:
181            if self.log_path and os.path.isdir(self.log_path):
182                with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f:
183                    pickle.dump(output, f)
184            if type(output) == str:
185                output = output.encode('utf-8')
186            msg_out = self.graph_codec.encode(msg_uuid=msg_out_uuid, op_from=self.worker_id, payload=output)
187            self.producer.send(msg_out)
188            self.consumer.acknowledge(msg_in)
189            return output
190        else:
191            # No output is given, no need to materialize
192            self.consumer.acknowledge(msg_in)
193            return None
Compute( task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', max_time_diff_ms=10000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, drop_if_older_than_ms=None, log_verbose=False)
17    def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False,
18                 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst',
19                 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None,
20                 drop_if_older_than_ms=None, log_verbose=False):
21        """Initializes a compute operator.
22
23        Args:
24            task: The actual task to be performed.
25            pulsar_node: Address of Apache Pulsar server.
26            worker_id: The unique identifier of this operator.
27            gate_in: The gating function applied to input stream.
28            gate_out: The gating function applied to output stream.
29            ftp: When set to `True`, lazy routing mode is enabled.
30            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
31            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
32            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
33            topic_in: Pulsar topic of the input data stream.
34            topic_out: Pulsar topic of the output data stream.
35            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
36            no_overlap: When set to `True`, we ensure that every message is at most processed once.
37            min_interval_ms: The minimum time interval between two consecutive runs.
38            log_path: Path to store the replay log. When set to `None`, log is disabled.
39            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
40            drop_if_older_than_ms: When set to a value, messages older than this value relative to current time are dropped.
41            log_verbose: When set to `False`, we only log in the replay log when a join operation is performed.
42        """
43        self.client = pulsar.Client(pulsar_node)
44        self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema())
45        self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub',
46                                              consumer_type=ConsumerType.Shared,
47                                              schema=pulsar.schema.BytesSchema(),
48                                              initial_position=InitialPosition.Earliest)
49        self.task = task
50        self.worker_id = worker_id
51        self.gate_in = (lambda x: x) if gate_in is None else gate_in
52        self.gate_out = (lambda x: x) if gate_out is None else gate_out
53        self.ftp = ftp  # consider changing this name to ftp_in
54        self.local_ftp_path = local_ftp_path
55        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
56        self.ftp_delete = ftp_delete
57        self.latest_msg = dict()
58        self.latest_msg_in_uuid = dict()
59        self.latest_msg_publish_time_ms = dict()
60        self.latest_msg_consumed_time_ms = dict()
61        self.max_time_diff_ms = max_time_diff_ms
62        self.no_overlap = no_overlap
63        self.min_interval_ms = min_interval_ms  # prediction frequency
64        self.last_run_start_ms = 0
65        self.last_run_finish_ms = 0
66        self.log_path = log_path
67        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
68        self.drop_if_older_than_ms = drop_if_older_than_ms
69        self.log_verbose = log_verbose
70        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)

Initializes a compute operator.

Arguments:
  • task: The actual task to be performed.
  • pulsar_node: Address of Apache Pulsar server.
  • worker_id: The unique identifier of this operator.
  • gate_in: The gating function applied to input stream.
  • gate_out: The gating function applied to output stream.
  • ftp: When set to True, lazy routing mode is enabled.
  • ftp_memory: When set to True, in-memory lazy routing mode is enabled. Only effective when ftp=True.
  • ftp_delete: When set to True, delete remote data after fetching is complete. Only effective when ftp=True.
  • local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
  • topic_in: Pulsar topic of the input data stream.
  • topic_out: Pulsar topic of the output data stream.
  • max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
  • no_overlap: When set to True, we ensure that every message is at most processed once.
  • min_interval_ms: The minimum time interval between two consecutive runs.
  • log_path: Path to store the replay log. When set to None, log is disabled.
  • log_filename: File name of replay log. When set to None, the current timestamp is used as file name.
  • drop_if_older_than_ms: When set to a value, messages older than this value relative to current time are dropped.
  • log_verbose: When set to False, we only log in the replay log when a join operation is performed.
client
producer
consumer
task
worker_id
gate_in
gate_out
ftp
local_ftp_path
ftp_memory
ftp_delete
latest_msg
latest_msg_in_uuid
latest_msg_publish_time_ms
latest_msg_consumed_time_ms
max_time_diff_ms
no_overlap
min_interval_ms
last_run_start_ms
last_run_finish_ms
log_path
log_filename
drop_if_older_than_ms
log_verbose
graph_codec