edgeserve.model

  1import os
  2import time
  3import uuid
  4from typing import Callable, Optional, Tuple
  5
  6import pulsar
  7import pickle
  8import pathlib
  9from _pulsar import InitialPosition, ConsumerType
 10
 11from edgeserve.util import ftp_fetch, local_to_global_path
 12from edgeserve.message_format import GraphCodec
 13
 14
 15# This version intentionally drops schema support. Data sources cannot be combined adaptively.
 16class Model:
 17    """A model that takes in two topics and outputs to two topics.
 18    For input topics, one is the original data stream and the other is an intermittent signal stream.
 19    The original data stream is cached locally, waiting to be served.
 20    Actual model serving is only performed when a request is received from the intermittent signal stream.
 21    The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO;
 22    otherwise, it sends the prediction to the intermittent signal stream of a more expensive model.
 23    """
 24
 25    def __init__(self,
 26                 task: Callable,
 27                 pulsar_node: str,
 28                 topic_in_data: str,
 29                 topic_out_destination: str,
 30                 topic_in_signal: Optional[str] = None,
 31                 topic_out_signal: Optional[str] = None,
 32                 gate_in_data: Optional[Callable] = None,
 33                 gate_out_destination: Optional[Callable] = None,
 34                 gate_in_signal: Optional[Callable] = None,
 35                 gate_out_signal: Optional[Callable] = None,
 36                 model_id: Optional[str] = 'model1',
 37                 header_size: Optional[int] = None,
 38                 ftp: Optional[bool] = False,
 39                 ftp_memory: Optional[bool] = False,
 40                 ftp_delete: Optional[bool] = False,
 41                 local_ftp_path: Optional[str] = '/srv/ftp/',
 42                 max_time_diff_ms: Optional[int] = 10 * 1000,
 43                 no_overlap: Optional[bool] = False,
 44                 min_interval_ms: Optional[int] = 0,
 45                 log_path: Optional[str] = None,
 46                 log_filename: Optional[str] = None,
 47                 acknowledge: Optional[bool] = False,
 48                 receiver_queue_size: Optional[int] = 10000) -> None:
 49        """Initializes a model.
 50
 51        Args:
 52            task: The actual task to be performed.
 53            pulsar_node: Address of Apache Pulsar server.
 54            topic_in_data: Pulsar topic of the input data stream.
 55            topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present.
 56            topic_out_destination: Pulsar topic of the output destination stream.
 57            topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present.
 58            gate_in_data: The gating function applied to input data stream (but not the signal stream).
 59            gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
 60            gate_in_signal: The gating function applied to input signal stream.
 61            gate_out_signal: The gating function applied to output signal stream.
 62            model_id: The unique identifier of the model.
 63            ftp: When set to `True`, lazy routing mode is enabled.
 64            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
 65            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
 66            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
 67            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
 68            no_overlap: When set to `True`, we ensure that every message is at most processed once.
 69            min_interval_ms: The minimum time interval between two consecutive runs.
 70            log_path: Path to store the replay log. When set to `None`, log is disabled.
 71            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
 72            acknowledge: When set to `True`, the model acknowledges every message it receives.
 73            receiver_queue_size: The size of the receiver queue for Pulsar.
 74        """
 75        self.client = pulsar.Client(pulsar_node)
 76        self.producer_destination = self.client.create_producer(topic_out_destination,
 77                                                                schema=pulsar.schema.BytesSchema())
 78        self.topic_out_signal = topic_out_signal
 79        if topic_out_signal:
 80            self.producer_signal = self.client.create_producer(topic_out_signal,
 81                                                               schema=pulsar.schema.BytesSchema())
 82        self.task = task
 83        self.topic_in_data = topic_in_data
 84        self.topic_in_signal = topic_in_signal
 85        self.consumer = self._subscribe()
 86        self.gate_in_data = (lambda x: x) if gate_in_data is None else gate_in_data
 87        self.gate_out_destination = (lambda x: x) if gate_out_destination is None else gate_out_destination
 88        self.gate_in_signal = (lambda x: x) if gate_in_signal is None else gate_in_signal
 89        self.gate_out_signal = (lambda x: x) if gate_out_signal is None else gate_out_signal
 90        self.model_id = model_id
 91        self.ftp = ftp  # consider changing this name to ftp_in
 92        self.local_ftp_path = local_ftp_path
 93        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
 94        self.ftp_delete = ftp_delete
 95        self.cached = dict()
 96        self.latest_msg_uuid = dict()
 97        self.latest_msg_publish_time_ms = dict()
 98        self.latest_msg_consumed_time_ms = dict()
 99        self.max_time_diff_ms = max_time_diff_ms
100        self.no_overlap = no_overlap
101        self.min_interval_ms = min_interval_ms  # prediction frequency
102        self.last_run_start_ms = 0
103        self.log_path = log_path
104        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
105        self.last_log_duration_ms = -1
106        self.acknowledge = acknowledge
107        self.receiver_queue_size = receiver_queue_size
108
109        self.header_size = header_size if header_size else 32
110        self.graph_codec = GraphCodec(header_size=self.header_size)
111        self.cached_data = dict()
112        self.cached_signals = dict()
113
114    def _subscribe(self):
115        if self.topic_in_signal:
116            return self.client.subscribe([self.topic_in_data, self.topic_in_signal],
117                                         subscription_name='compute-sub',
118                                         consumer_type=ConsumerType.Shared,
119                                         schema=pulsar.schema.BytesSchema(),
120                                         initial_position=InitialPosition.Earliest,
121                                         receiver_queue_size=self.receiver_queue_size)
122
123        return self.client.subscribe(self.topic_in_data, subscription_name='compute-sub',
124                                     consumer_type=ConsumerType.Shared, schema=pulsar.schema.BytesSchema(),
125                                     initial_position=InitialPosition.Earliest,
126                                     receiver_queue_size=self.receiver_queue_size)
127
128    def __enter__(self):
129        return self
130
131    def __exit__(self, exc_type, exc_val, exc_tb):
132        self.client.close()
133
134    def _try_task(self, flow_id) -> Tuple[bool, Optional[bool], Optional[uuid.UUID], Optional[bytes]]:
135        """Performs actual computation.
136
137        Args:
138            flow_id: ID of the data flow to be processed.
139
140        Returns:
141            is_performed: Whether the job is actually performed.
142            is_satisfied: Whether the prediction is considered satisfied. If set to `True`, the result is sent to the
143                          destination topic; if set to `False`, the result is sent to a more expensive model.
144            msg_out_uuid: The UUID of the output message. If set to `None`, no message is sent out.
145            output: The prediction result in bytes.
146        """
147        # When `topic_in_signal` is not `None`, we only do actual computation when both data stream and signal stream
148        # from the same `flow_id` are present.
149        if self.topic_in_signal and (flow_id not in self.cached_data or flow_id not in self.cached_signals):
150            return False, None, None, None
151
152        # Avoid running too frequently for expensive tasks
153        if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms:
154            return False, None, None, None
155
156        self.last_run_start_ms = time.time() * 1000
157
158        # Lazy data routing: only fetch data from FTP counterpart when we actually need it.
159        if self.ftp:
160            if not self.cached_data[flow_id].startswith('ftp://'):
161                raise ValueError("FTP mode is enabled but incoming message does not have FTP format")
162            local_file_path = ftp_fetch(self.cached_data[flow_id], self.local_ftp_path, memory=self.ftp_memory,
163                                        delete=self.ftp_delete)
164            self.cached_data[flow_id] = local_file_path
165
166        if self.topic_in_signal:
167            is_satisfied, output = self.task(self.cached_data[flow_id], self.cached_signals[flow_id])
168        else:
169            is_satisfied, output = self.task(self.cached_data[flow_id])
170        last_run_finish_ms = time.time() * 1000
171
172        if self.ftp and not self.ftp_memory:
173            output = self._write_ftp(output, local_file_path, last_run_finish_ms)
174
175        msg_out_uuid = uuid.uuid4()
176        # If log_path is not None, we write aggregation decisions to a log file.
177        if self.log_path and os.path.isdir(self.log_path):
178            self._log_aggregation_decisions(flow_id, last_run_finish_ms, is_satisfied, msg_out_uuid, output)
179
180        self._clear_cache(flow_id)
181        return True, is_satisfied, msg_out_uuid, output
182
183    def _write_ftp(self, output, local_file_path, last_run_finish_ms):
184        # For now, use the completion timestamp as the filename of output FTP file
185        ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output')
186        pathlib.Path(ftp_output_dir).mkdir(exist_ok=True)
187        with open(os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp', 'w') as f:
188            f.write(output)
189        output = os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp'
190        return local_to_global_path(output, self.local_ftp_path)
191
192    def _clear_cache(self, flow_id):
193        del self.cached_data[flow_id]
194        if self.topic_in_signal:
195            del self.cached_signals[flow_id]
196
197    def _log_incoming_msg(self, msg_in_uuid, flow_id, msg_in):
198        self.latest_msg_consumed_time_ms[flow_id] = time.time() * 1000
199        if flow_id not in self.latest_msg_publish_time_ms:
200            self.latest_msg_publish_time_ms[flow_id] = [msg_in.publish_timestamp()]
201            self.latest_msg_uuid[flow_id] = [str(msg_in_uuid)]
202        else:
203            self.latest_msg_publish_time_ms[flow_id].append(msg_in.publish_timestamp())
204            self.latest_msg_uuid[flow_id].append(str(msg_in_uuid))
205
206    def _log_aggregation_decisions(self, flow_id, last_run_finish_ms, is_satisfied, msg_out_uuid, msg_out_payload):
207        replay_log = {'msg_in_uuid': self.latest_msg_uuid,
208                      'msg_in_publish_time_ms': self.latest_msg_publish_time_ms,
209                      'msg_in_consumed_time_ms': self.latest_msg_consumed_time_ms,
210                      'start_compute_time_ms': self.last_run_start_ms,
211                      'finish_compute_time_ms': last_run_finish_ms,
212                      'last_log_duration_ms': self.last_log_duration_ms,
213                      'data_in_payload': self.cached_data[flow_id],
214                      'signal_in_payload': self.cached_signals[flow_id] if self.topic_in_signal else None,
215                      'msg_out_is_satisfied': is_satisfied,
216                      'msg_out_uuid': msg_out_uuid,
217                      'msg_out_payload': msg_out_payload}
218        with open(os.path.join(self.log_path, self.log_filename + '.log'), 'ab') as f:
219            pickle.dump(replay_log, f)
220        self.last_log_duration_ms = time.time() * 1000 - last_run_finish_ms
221
222        # If no_overlap, reset latest_msg and latest_msg_time_ms so a message won't be processed twice.
223        if self.no_overlap:
224            self.latest_msg_uuid = dict()
225            self.latest_msg_publish_time_ms = dict()
226            self.latest_msg_consumed_time_ms = dict()
227
228    def __iter__(self):
229        return self
230
231    def __next__(self):
232        msg_in = self.consumer.receive()
233        msg_in_uuid, op_from, flow_id, payload = self.graph_codec.decode(msg_in.value())
234        actual_topic_in = msg_in.topic_name().split('/')[-1]
235
236        if actual_topic_in == self.topic_in_signal or msg_in.topic_name() == self.topic_in_signal:
237            # We locally cache the prediction result ("signal") from another model.
238            self.cached_signals[flow_id] = self.gate_in_signal(payload)
239        elif actual_topic_in == self.topic_in_data or msg_in.topic_name() == self.topic_in_data:
240            self.cached_data[flow_id] = self.gate_in_data(payload)
241        else:
242            raise ValueError(
243                "The consumer's topic name does not match that of incoming message. The topic of incoming message is",
244                msg_in.topic_name())
245
246        if self.log_path and os.path.isdir(self.log_path):
247            self._log_incoming_msg(msg_in_uuid, flow_id, msg_in)
248
249        is_performed, is_satisfied, msg_out_uuid, output = self._try_task(flow_id)
250        if is_performed:
251            output = self.gate_out_destination(output)
252        else:
253            output = self.gate_out_signal(output)
254
255        if output:
256            if self.log_path and os.path.isdir(self.log_path):
257                with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f:
258                    pickle.dump(output, f)
259            if is_satisfied:
260                self.producer_destination.send(self.graph_codec.encode(msg_out_uuid, self.model_id, output, flow_id))
261            elif self.topic_out_signal:
262                self.producer_signal.send(self.graph_codec.encode(msg_out_uuid, self.model_id, output, flow_id))
263            else:
264                raise ValueError("The result is not satisfactory but output signal topic is not present.")
265
266        if self.acknowledge:
267            self.consumer.acknowledge(msg_in)
268
269        return output if output else None
class Model:
 17class Model:
 18    """A model that takes in two topics and outputs to two topics.
 19    For input topics, one is the original data stream and the other is an intermittent signal stream.
 20    The original data stream is cached locally, waiting to be served.
 21    Actual model serving is only performed when a request is received from the intermittent signal stream.
 22    The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO;
 23    otherwise, it sends the prediction to the intermittent signal stream of a more expensive model.
 24    """
 25
 26    def __init__(self,
 27                 task: Callable,
 28                 pulsar_node: str,
 29                 topic_in_data: str,
 30                 topic_out_destination: str,
 31                 topic_in_signal: Optional[str] = None,
 32                 topic_out_signal: Optional[str] = None,
 33                 gate_in_data: Optional[Callable] = None,
 34                 gate_out_destination: Optional[Callable] = None,
 35                 gate_in_signal: Optional[Callable] = None,
 36                 gate_out_signal: Optional[Callable] = None,
 37                 model_id: Optional[str] = 'model1',
 38                 header_size: Optional[int] = None,
 39                 ftp: Optional[bool] = False,
 40                 ftp_memory: Optional[bool] = False,
 41                 ftp_delete: Optional[bool] = False,
 42                 local_ftp_path: Optional[str] = '/srv/ftp/',
 43                 max_time_diff_ms: Optional[int] = 10 * 1000,
 44                 no_overlap: Optional[bool] = False,
 45                 min_interval_ms: Optional[int] = 0,
 46                 log_path: Optional[str] = None,
 47                 log_filename: Optional[str] = None,
 48                 acknowledge: Optional[bool] = False,
 49                 receiver_queue_size: Optional[int] = 10000) -> None:
 50        """Initializes a model.
 51
 52        Args:
 53            task: The actual task to be performed.
 54            pulsar_node: Address of Apache Pulsar server.
 55            topic_in_data: Pulsar topic of the input data stream.
 56            topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present.
 57            topic_out_destination: Pulsar topic of the output destination stream.
 58            topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present.
 59            gate_in_data: The gating function applied to input data stream (but not the signal stream).
 60            gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
 61            gate_in_signal: The gating function applied to input signal stream.
 62            gate_out_signal: The gating function applied to output signal stream.
 63            model_id: The unique identifier of the model.
 64            ftp: When set to `True`, lazy routing mode is enabled.
 65            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
 66            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
 67            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
 68            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
 69            no_overlap: When set to `True`, we ensure that every message is at most processed once.
 70            min_interval_ms: The minimum time interval between two consecutive runs.
 71            log_path: Path to store the replay log. When set to `None`, log is disabled.
 72            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
 73            acknowledge: When set to `True`, the model acknowledges every message it receives.
 74            receiver_queue_size: The size of the receiver queue for Pulsar.
 75        """
 76        self.client = pulsar.Client(pulsar_node)
 77        self.producer_destination = self.client.create_producer(topic_out_destination,
 78                                                                schema=pulsar.schema.BytesSchema())
 79        self.topic_out_signal = topic_out_signal
 80        if topic_out_signal:
 81            self.producer_signal = self.client.create_producer(topic_out_signal,
 82                                                               schema=pulsar.schema.BytesSchema())
 83        self.task = task
 84        self.topic_in_data = topic_in_data
 85        self.topic_in_signal = topic_in_signal
 86        self.consumer = self._subscribe()
 87        self.gate_in_data = (lambda x: x) if gate_in_data is None else gate_in_data
 88        self.gate_out_destination = (lambda x: x) if gate_out_destination is None else gate_out_destination
 89        self.gate_in_signal = (lambda x: x) if gate_in_signal is None else gate_in_signal
 90        self.gate_out_signal = (lambda x: x) if gate_out_signal is None else gate_out_signal
 91        self.model_id = model_id
 92        self.ftp = ftp  # consider changing this name to ftp_in
 93        self.local_ftp_path = local_ftp_path
 94        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
 95        self.ftp_delete = ftp_delete
 96        self.cached = dict()
 97        self.latest_msg_uuid = dict()
 98        self.latest_msg_publish_time_ms = dict()
 99        self.latest_msg_consumed_time_ms = dict()
100        self.max_time_diff_ms = max_time_diff_ms
101        self.no_overlap = no_overlap
102        self.min_interval_ms = min_interval_ms  # prediction frequency
103        self.last_run_start_ms = 0
104        self.log_path = log_path
105        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
106        self.last_log_duration_ms = -1
107        self.acknowledge = acknowledge
108        self.receiver_queue_size = receiver_queue_size
109
110        self.header_size = header_size if header_size else 32
111        self.graph_codec = GraphCodec(header_size=self.header_size)
112        self.cached_data = dict()
113        self.cached_signals = dict()
114
115    def _subscribe(self):
116        if self.topic_in_signal:
117            return self.client.subscribe([self.topic_in_data, self.topic_in_signal],
118                                         subscription_name='compute-sub',
119                                         consumer_type=ConsumerType.Shared,
120                                         schema=pulsar.schema.BytesSchema(),
121                                         initial_position=InitialPosition.Earliest,
122                                         receiver_queue_size=self.receiver_queue_size)
123
124        return self.client.subscribe(self.topic_in_data, subscription_name='compute-sub',
125                                     consumer_type=ConsumerType.Shared, schema=pulsar.schema.BytesSchema(),
126                                     initial_position=InitialPosition.Earliest,
127                                     receiver_queue_size=self.receiver_queue_size)
128
129    def __enter__(self):
130        return self
131
132    def __exit__(self, exc_type, exc_val, exc_tb):
133        self.client.close()
134
135    def _try_task(self, flow_id) -> Tuple[bool, Optional[bool], Optional[uuid.UUID], Optional[bytes]]:
136        """Performs actual computation.
137
138        Args:
139            flow_id: ID of the data flow to be processed.
140
141        Returns:
142            is_performed: Whether the job is actually performed.
143            is_satisfied: Whether the prediction is considered satisfied. If set to `True`, the result is sent to the
144                          destination topic; if set to `False`, the result is sent to a more expensive model.
145            msg_out_uuid: The UUID of the output message. If set to `None`, no message is sent out.
146            output: The prediction result in bytes.
147        """
148        # When `topic_in_signal` is not `None`, we only do actual computation when both data stream and signal stream
149        # from the same `flow_id` are present.
150        if self.topic_in_signal and (flow_id not in self.cached_data or flow_id not in self.cached_signals):
151            return False, None, None, None
152
153        # Avoid running too frequently for expensive tasks
154        if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms:
155            return False, None, None, None
156
157        self.last_run_start_ms = time.time() * 1000
158
159        # Lazy data routing: only fetch data from FTP counterpart when we actually need it.
160        if self.ftp:
161            if not self.cached_data[flow_id].startswith('ftp://'):
162                raise ValueError("FTP mode is enabled but incoming message does not have FTP format")
163            local_file_path = ftp_fetch(self.cached_data[flow_id], self.local_ftp_path, memory=self.ftp_memory,
164                                        delete=self.ftp_delete)
165            self.cached_data[flow_id] = local_file_path
166
167        if self.topic_in_signal:
168            is_satisfied, output = self.task(self.cached_data[flow_id], self.cached_signals[flow_id])
169        else:
170            is_satisfied, output = self.task(self.cached_data[flow_id])
171        last_run_finish_ms = time.time() * 1000
172
173        if self.ftp and not self.ftp_memory:
174            output = self._write_ftp(output, local_file_path, last_run_finish_ms)
175
176        msg_out_uuid = uuid.uuid4()
177        # If log_path is not None, we write aggregation decisions to a log file.
178        if self.log_path and os.path.isdir(self.log_path):
179            self._log_aggregation_decisions(flow_id, last_run_finish_ms, is_satisfied, msg_out_uuid, output)
180
181        self._clear_cache(flow_id)
182        return True, is_satisfied, msg_out_uuid, output
183
184    def _write_ftp(self, output, local_file_path, last_run_finish_ms):
185        # For now, use the completion timestamp as the filename of output FTP file
186        ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output')
187        pathlib.Path(ftp_output_dir).mkdir(exist_ok=True)
188        with open(os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp', 'w') as f:
189            f.write(output)
190        output = os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp'
191        return local_to_global_path(output, self.local_ftp_path)
192
193    def _clear_cache(self, flow_id):
194        del self.cached_data[flow_id]
195        if self.topic_in_signal:
196            del self.cached_signals[flow_id]
197
198    def _log_incoming_msg(self, msg_in_uuid, flow_id, msg_in):
199        self.latest_msg_consumed_time_ms[flow_id] = time.time() * 1000
200        if flow_id not in self.latest_msg_publish_time_ms:
201            self.latest_msg_publish_time_ms[flow_id] = [msg_in.publish_timestamp()]
202            self.latest_msg_uuid[flow_id] = [str(msg_in_uuid)]
203        else:
204            self.latest_msg_publish_time_ms[flow_id].append(msg_in.publish_timestamp())
205            self.latest_msg_uuid[flow_id].append(str(msg_in_uuid))
206
207    def _log_aggregation_decisions(self, flow_id, last_run_finish_ms, is_satisfied, msg_out_uuid, msg_out_payload):
208        replay_log = {'msg_in_uuid': self.latest_msg_uuid,
209                      'msg_in_publish_time_ms': self.latest_msg_publish_time_ms,
210                      'msg_in_consumed_time_ms': self.latest_msg_consumed_time_ms,
211                      'start_compute_time_ms': self.last_run_start_ms,
212                      'finish_compute_time_ms': last_run_finish_ms,
213                      'last_log_duration_ms': self.last_log_duration_ms,
214                      'data_in_payload': self.cached_data[flow_id],
215                      'signal_in_payload': self.cached_signals[flow_id] if self.topic_in_signal else None,
216                      'msg_out_is_satisfied': is_satisfied,
217                      'msg_out_uuid': msg_out_uuid,
218                      'msg_out_payload': msg_out_payload}
219        with open(os.path.join(self.log_path, self.log_filename + '.log'), 'ab') as f:
220            pickle.dump(replay_log, f)
221        self.last_log_duration_ms = time.time() * 1000 - last_run_finish_ms
222
223        # If no_overlap, reset latest_msg and latest_msg_time_ms so a message won't be processed twice.
224        if self.no_overlap:
225            self.latest_msg_uuid = dict()
226            self.latest_msg_publish_time_ms = dict()
227            self.latest_msg_consumed_time_ms = dict()
228
229    def __iter__(self):
230        return self
231
232    def __next__(self):
233        msg_in = self.consumer.receive()
234        msg_in_uuid, op_from, flow_id, payload = self.graph_codec.decode(msg_in.value())
235        actual_topic_in = msg_in.topic_name().split('/')[-1]
236
237        if actual_topic_in == self.topic_in_signal or msg_in.topic_name() == self.topic_in_signal:
238            # We locally cache the prediction result ("signal") from another model.
239            self.cached_signals[flow_id] = self.gate_in_signal(payload)
240        elif actual_topic_in == self.topic_in_data or msg_in.topic_name() == self.topic_in_data:
241            self.cached_data[flow_id] = self.gate_in_data(payload)
242        else:
243            raise ValueError(
244                "The consumer's topic name does not match that of incoming message. The topic of incoming message is",
245                msg_in.topic_name())
246
247        if self.log_path and os.path.isdir(self.log_path):
248            self._log_incoming_msg(msg_in_uuid, flow_id, msg_in)
249
250        is_performed, is_satisfied, msg_out_uuid, output = self._try_task(flow_id)
251        if is_performed:
252            output = self.gate_out_destination(output)
253        else:
254            output = self.gate_out_signal(output)
255
256        if output:
257            if self.log_path and os.path.isdir(self.log_path):
258                with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f:
259                    pickle.dump(output, f)
260            if is_satisfied:
261                self.producer_destination.send(self.graph_codec.encode(msg_out_uuid, self.model_id, output, flow_id))
262            elif self.topic_out_signal:
263                self.producer_signal.send(self.graph_codec.encode(msg_out_uuid, self.model_id, output, flow_id))
264            else:
265                raise ValueError("The result is not satisfactory but output signal topic is not present.")
266
267        if self.acknowledge:
268            self.consumer.acknowledge(msg_in)
269
270        return output if output else None

A model that takes in two topics and outputs to two topics. For input topics, one is the original data stream and the other is an intermittent signal stream. The original data stream is cached locally, waiting to be served. Actual model serving is only performed when a request is received from the intermittent signal stream. The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO; otherwise, it sends the prediction to the intermittent signal stream of a more expensive model.

Model( task: Callable, pulsar_node: str, topic_in_data: str, topic_out_destination: str, topic_in_signal: Optional[str] = None, topic_out_signal: Optional[str] = None, gate_in_data: Optional[Callable] = None, gate_out_destination: Optional[Callable] = None, gate_in_signal: Optional[Callable] = None, gate_out_signal: Optional[Callable] = None, model_id: Optional[str] = 'model1', header_size: Optional[int] = None, ftp: Optional[bool] = False, ftp_memory: Optional[bool] = False, ftp_delete: Optional[bool] = False, local_ftp_path: Optional[str] = '/srv/ftp/', max_time_diff_ms: Optional[int] = 10000, no_overlap: Optional[bool] = False, min_interval_ms: Optional[int] = 0, log_path: Optional[str] = None, log_filename: Optional[str] = None, acknowledge: Optional[bool] = False, receiver_queue_size: Optional[int] = 10000)
 26    def __init__(self,
 27                 task: Callable,
 28                 pulsar_node: str,
 29                 topic_in_data: str,
 30                 topic_out_destination: str,
 31                 topic_in_signal: Optional[str] = None,
 32                 topic_out_signal: Optional[str] = None,
 33                 gate_in_data: Optional[Callable] = None,
 34                 gate_out_destination: Optional[Callable] = None,
 35                 gate_in_signal: Optional[Callable] = None,
 36                 gate_out_signal: Optional[Callable] = None,
 37                 model_id: Optional[str] = 'model1',
 38                 header_size: Optional[int] = None,
 39                 ftp: Optional[bool] = False,
 40                 ftp_memory: Optional[bool] = False,
 41                 ftp_delete: Optional[bool] = False,
 42                 local_ftp_path: Optional[str] = '/srv/ftp/',
 43                 max_time_diff_ms: Optional[int] = 10 * 1000,
 44                 no_overlap: Optional[bool] = False,
 45                 min_interval_ms: Optional[int] = 0,
 46                 log_path: Optional[str] = None,
 47                 log_filename: Optional[str] = None,
 48                 acknowledge: Optional[bool] = False,
 49                 receiver_queue_size: Optional[int] = 10000) -> None:
 50        """Initializes a model.
 51
 52        Args:
 53            task: The actual task to be performed.
 54            pulsar_node: Address of Apache Pulsar server.
 55            topic_in_data: Pulsar topic of the input data stream.
 56            topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present.
 57            topic_out_destination: Pulsar topic of the output destination stream.
 58            topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present.
 59            gate_in_data: The gating function applied to input data stream (but not the signal stream).
 60            gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
 61            gate_in_signal: The gating function applied to input signal stream.
 62            gate_out_signal: The gating function applied to output signal stream.
 63            model_id: The unique identifier of the model.
 64            ftp: When set to `True`, lazy routing mode is enabled.
 65            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
 66            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
 67            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
 68            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
 69            no_overlap: When set to `True`, we ensure that every message is at most processed once.
 70            min_interval_ms: The minimum time interval between two consecutive runs.
 71            log_path: Path to store the replay log. When set to `None`, log is disabled.
 72            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
 73            acknowledge: When set to `True`, the model acknowledges every message it receives.
 74            receiver_queue_size: The size of the receiver queue for Pulsar.
 75        """
 76        self.client = pulsar.Client(pulsar_node)
 77        self.producer_destination = self.client.create_producer(topic_out_destination,
 78                                                                schema=pulsar.schema.BytesSchema())
 79        self.topic_out_signal = topic_out_signal
 80        if topic_out_signal:
 81            self.producer_signal = self.client.create_producer(topic_out_signal,
 82                                                               schema=pulsar.schema.BytesSchema())
 83        self.task = task
 84        self.topic_in_data = topic_in_data
 85        self.topic_in_signal = topic_in_signal
 86        self.consumer = self._subscribe()
 87        self.gate_in_data = (lambda x: x) if gate_in_data is None else gate_in_data
 88        self.gate_out_destination = (lambda x: x) if gate_out_destination is None else gate_out_destination
 89        self.gate_in_signal = (lambda x: x) if gate_in_signal is None else gate_in_signal
 90        self.gate_out_signal = (lambda x: x) if gate_out_signal is None else gate_out_signal
 91        self.model_id = model_id
 92        self.ftp = ftp  # consider changing this name to ftp_in
 93        self.local_ftp_path = local_ftp_path
 94        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
 95        self.ftp_delete = ftp_delete
 96        self.cached = dict()
 97        self.latest_msg_uuid = dict()
 98        self.latest_msg_publish_time_ms = dict()
 99        self.latest_msg_consumed_time_ms = dict()
100        self.max_time_diff_ms = max_time_diff_ms
101        self.no_overlap = no_overlap
102        self.min_interval_ms = min_interval_ms  # prediction frequency
103        self.last_run_start_ms = 0
104        self.log_path = log_path
105        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
106        self.last_log_duration_ms = -1
107        self.acknowledge = acknowledge
108        self.receiver_queue_size = receiver_queue_size
109
110        self.header_size = header_size if header_size else 32
111        self.graph_codec = GraphCodec(header_size=self.header_size)
112        self.cached_data = dict()
113        self.cached_signals = dict()

Initializes a model.

Arguments:
  • task: The actual task to be performed.
  • pulsar_node: Address of Apache Pulsar server.
  • topic_in_data: Pulsar topic of the input data stream.
  • topic_in_signal: Pulsar topic of the input signal stream. When set to None, no signal stream is present.
  • topic_out_destination: Pulsar topic of the output destination stream.
  • topic_out_signal: Pulsar topic of the output signal stream. When set to None, no signal stream is present.
  • gate_in_data: The gating function applied to input data stream (but not the signal stream).
  • gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
  • gate_in_signal: The gating function applied to input signal stream.
  • gate_out_signal: The gating function applied to output signal stream.
  • model_id: The unique identifier of the model.
  • ftp: When set to True, lazy routing mode is enabled.
  • ftp_memory: When set to True, in-memory lazy routing mode is enabled. Only effective when ftp=True.
  • ftp_delete: When set to True, delete remote data after fetching is complete. Only effective when ftp=True.
  • local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
  • max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
  • no_overlap: When set to True, we ensure that every message is at most processed once.
  • min_interval_ms: The minimum time interval between two consecutive runs.
  • log_path: Path to store the replay log. When set to None, log is disabled.
  • log_filename: File name of replay log. When set to None, the current timestamp is used as file name.
  • acknowledge: When set to True, the model acknowledges every message it receives.
  • receiver_queue_size: The size of the receiver queue for Pulsar.
client
producer_destination
topic_out_signal
task
topic_in_data
topic_in_signal
consumer
gate_in_data
gate_out_destination
gate_in_signal
gate_out_signal
model_id
ftp
local_ftp_path
ftp_memory
ftp_delete
cached
latest_msg_uuid
latest_msg_publish_time_ms
latest_msg_consumed_time_ms
max_time_diff_ms
no_overlap
min_interval_ms
last_run_start_ms
log_path
log_filename
last_log_duration_ms
acknowledge
receiver_queue_size
header_size
graph_codec
cached_data
cached_signals