edgeserve.model
1import os 2import time 3import uuid 4from typing import Callable, Optional, Tuple 5 6import pulsar 7import pickle 8import pathlib 9from _pulsar import InitialPosition, ConsumerType 10 11from edgeserve.util import ftp_fetch, local_to_global_path 12from edgeserve.message_format import GraphCodec 13 14 15# This version intentionally drops schema support. Data sources cannot be combined adaptively. 16class Model: 17 """A model that takes in two topics and outputs to two topics. 18 For input topics, one is the original data stream and the other is an intermittent signal stream. 19 The original data stream is cached locally, waiting to be served. 20 Actual model serving is only performed when a request is received from the intermittent signal stream. 21 The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO; 22 otherwise, it sends the prediction to the intermittent signal stream of a more expensive model. 23 """ 24 25 def __init__(self, 26 task: Callable, 27 pulsar_node: str, 28 topic_in_data: str, 29 topic_out_destination: str, 30 topic_in_signal: Optional[str] = None, 31 topic_out_signal: Optional[str] = None, 32 gate_in_data: Optional[Callable] = None, 33 gate_out_destination: Optional[Callable] = None, 34 gate_in_signal: Optional[Callable] = None, 35 gate_out_signal: Optional[Callable] = None, 36 model_id: Optional[str] = 'model1', 37 header_size: Optional[int] = None, 38 ftp: Optional[bool] = False, 39 ftp_memory: Optional[bool] = False, 40 ftp_delete: Optional[bool] = False, 41 local_ftp_path: Optional[str] = '/srv/ftp/', 42 max_time_diff_ms: Optional[int] = 10 * 1000, 43 no_overlap: Optional[bool] = False, 44 min_interval_ms: Optional[int] = 0, 45 log_path: Optional[str] = None, 46 log_filename: Optional[str] = None, 47 acknowledge: Optional[bool] = False, 48 receiver_queue_size: Optional[int] = 10000) -> None: 49 """Initializes a model. 50 51 Args: 52 task: The actual task to be performed. 53 pulsar_node: Address of Apache Pulsar server. 54 topic_in_data: Pulsar topic of the input data stream. 55 topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present. 56 topic_out_destination: Pulsar topic of the output destination stream. 57 topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present. 58 gate_in_data: The gating function applied to input data stream (but not the signal stream). 59 gate_out_destination: The gating function applied to output prediction stream (to the destination topic). 60 gate_in_signal: The gating function applied to input signal stream. 61 gate_out_signal: The gating function applied to output signal stream. 62 model_id: The unique identifier of the model. 63 ftp: When set to `True`, lazy routing mode is enabled. 64 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 65 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 66 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 67 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 68 no_overlap: When set to `True`, we ensure that every message is at most processed once. 69 min_interval_ms: The minimum time interval between two consecutive runs. 70 log_path: Path to store the replay log. When set to `None`, log is disabled. 71 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 72 acknowledge: When set to `True`, the model acknowledges every message it receives. 73 receiver_queue_size: The size of the receiver queue for Pulsar. 74 """ 75 self.client = pulsar.Client(pulsar_node) 76 self.producer_destination = self.client.create_producer(topic_out_destination, 77 schema=pulsar.schema.BytesSchema()) 78 self.topic_out_signal = topic_out_signal 79 if topic_out_signal: 80 self.producer_signal = self.client.create_producer(topic_out_signal, 81 schema=pulsar.schema.BytesSchema()) 82 self.task = task 83 self.topic_in_data = topic_in_data 84 self.topic_in_signal = topic_in_signal 85 self.consumer = self._subscribe() 86 self.gate_in_data = (lambda x: x) if gate_in_data is None else gate_in_data 87 self.gate_out_destination = (lambda x: x) if gate_out_destination is None else gate_out_destination 88 self.gate_in_signal = (lambda x: x) if gate_in_signal is None else gate_in_signal 89 self.gate_out_signal = (lambda x: x) if gate_out_signal is None else gate_out_signal 90 self.model_id = model_id 91 self.ftp = ftp # consider changing this name to ftp_in 92 self.local_ftp_path = local_ftp_path 93 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 94 self.ftp_delete = ftp_delete 95 self.cached = dict() 96 self.latest_msg_uuid = dict() 97 self.latest_msg_publish_time_ms = dict() 98 self.latest_msg_consumed_time_ms = dict() 99 self.max_time_diff_ms = max_time_diff_ms 100 self.no_overlap = no_overlap 101 self.min_interval_ms = min_interval_ms # prediction frequency 102 self.last_run_start_ms = 0 103 self.log_path = log_path 104 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 105 self.last_log_duration_ms = -1 106 self.acknowledge = acknowledge 107 self.receiver_queue_size = receiver_queue_size 108 109 self.header_size = header_size if header_size else 32 110 self.graph_codec = GraphCodec(header_size=self.header_size) 111 self.cached_data = dict() 112 self.cached_signals = dict() 113 114 def _subscribe(self): 115 if self.topic_in_signal: 116 return self.client.subscribe([self.topic_in_data, self.topic_in_signal], 117 subscription_name='compute-sub', 118 consumer_type=ConsumerType.Shared, 119 schema=pulsar.schema.BytesSchema(), 120 initial_position=InitialPosition.Earliest, 121 receiver_queue_size=self.receiver_queue_size) 122 123 return self.client.subscribe(self.topic_in_data, subscription_name='compute-sub', 124 consumer_type=ConsumerType.Shared, schema=pulsar.schema.BytesSchema(), 125 initial_position=InitialPosition.Earliest, 126 receiver_queue_size=self.receiver_queue_size) 127 128 def __enter__(self): 129 return self 130 131 def __exit__(self, exc_type, exc_val, exc_tb): 132 self.client.close() 133 134 def _try_task(self, flow_id) -> Tuple[bool, Optional[bool], Optional[uuid.UUID], Optional[bytes]]: 135 """Performs actual computation. 136 137 Args: 138 flow_id: ID of the data flow to be processed. 139 140 Returns: 141 is_performed: Whether the job is actually performed. 142 is_satisfied: Whether the prediction is considered satisfied. If set to `True`, the result is sent to the 143 destination topic; if set to `False`, the result is sent to a more expensive model. 144 msg_out_uuid: The UUID of the output message. If set to `None`, no message is sent out. 145 output: The prediction result in bytes. 146 """ 147 # When `topic_in_signal` is not `None`, we only do actual computation when both data stream and signal stream 148 # from the same `flow_id` are present. 149 if self.topic_in_signal and (flow_id not in self.cached_data or flow_id not in self.cached_signals): 150 return False, None, None, None 151 152 # Avoid running too frequently for expensive tasks 153 if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms: 154 return False, None, None, None 155 156 self.last_run_start_ms = time.time() * 1000 157 158 # Lazy data routing: only fetch data from FTP counterpart when we actually need it. 159 if self.ftp: 160 if not self.cached_data[flow_id].startswith('ftp://'): 161 raise ValueError("FTP mode is enabled but incoming message does not have FTP format") 162 local_file_path = ftp_fetch(self.cached_data[flow_id], self.local_ftp_path, memory=self.ftp_memory, 163 delete=self.ftp_delete) 164 self.cached_data[flow_id] = local_file_path 165 166 if self.topic_in_signal: 167 is_satisfied, output = self.task(self.cached_data[flow_id], self.cached_signals[flow_id]) 168 else: 169 is_satisfied, output = self.task(self.cached_data[flow_id]) 170 last_run_finish_ms = time.time() * 1000 171 172 if self.ftp and not self.ftp_memory: 173 output = self._write_ftp(output, local_file_path, last_run_finish_ms) 174 175 msg_out_uuid = uuid.uuid4() 176 # If log_path is not None, we write aggregation decisions to a log file. 177 if self.log_path and os.path.isdir(self.log_path): 178 self._log_aggregation_decisions(flow_id, last_run_finish_ms, is_satisfied, msg_out_uuid, output) 179 180 self._clear_cache(flow_id) 181 return True, is_satisfied, msg_out_uuid, output 182 183 def _write_ftp(self, output, local_file_path, last_run_finish_ms): 184 # For now, use the completion timestamp as the filename of output FTP file 185 ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output') 186 pathlib.Path(ftp_output_dir).mkdir(exist_ok=True) 187 with open(os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp', 'w') as f: 188 f.write(output) 189 output = os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp' 190 return local_to_global_path(output, self.local_ftp_path) 191 192 def _clear_cache(self, flow_id): 193 del self.cached_data[flow_id] 194 if self.topic_in_signal: 195 del self.cached_signals[flow_id] 196 197 def _log_incoming_msg(self, msg_in_uuid, flow_id, msg_in): 198 self.latest_msg_consumed_time_ms[flow_id] = time.time() * 1000 199 if flow_id not in self.latest_msg_publish_time_ms: 200 self.latest_msg_publish_time_ms[flow_id] = [msg_in.publish_timestamp()] 201 self.latest_msg_uuid[flow_id] = [str(msg_in_uuid)] 202 else: 203 self.latest_msg_publish_time_ms[flow_id].append(msg_in.publish_timestamp()) 204 self.latest_msg_uuid[flow_id].append(str(msg_in_uuid)) 205 206 def _log_aggregation_decisions(self, flow_id, last_run_finish_ms, is_satisfied, msg_out_uuid, msg_out_payload): 207 replay_log = {'msg_in_uuid': self.latest_msg_uuid, 208 'msg_in_publish_time_ms': self.latest_msg_publish_time_ms, 209 'msg_in_consumed_time_ms': self.latest_msg_consumed_time_ms, 210 'start_compute_time_ms': self.last_run_start_ms, 211 'finish_compute_time_ms': last_run_finish_ms, 212 'last_log_duration_ms': self.last_log_duration_ms, 213 'data_in_payload': self.cached_data[flow_id], 214 'signal_in_payload': self.cached_signals[flow_id] if self.topic_in_signal else None, 215 'msg_out_is_satisfied': is_satisfied, 216 'msg_out_uuid': msg_out_uuid, 217 'msg_out_payload': msg_out_payload} 218 with open(os.path.join(self.log_path, self.log_filename + '.log'), 'ab') as f: 219 pickle.dump(replay_log, f) 220 self.last_log_duration_ms = time.time() * 1000 - last_run_finish_ms 221 222 # If no_overlap, reset latest_msg and latest_msg_time_ms so a message won't be processed twice. 223 if self.no_overlap: 224 self.latest_msg_uuid = dict() 225 self.latest_msg_publish_time_ms = dict() 226 self.latest_msg_consumed_time_ms = dict() 227 228 def __iter__(self): 229 return self 230 231 def __next__(self): 232 msg_in = self.consumer.receive() 233 msg_in_uuid, op_from, flow_id, payload = self.graph_codec.decode(msg_in.value()) 234 actual_topic_in = msg_in.topic_name().split('/')[-1] 235 236 if actual_topic_in == self.topic_in_signal or msg_in.topic_name() == self.topic_in_signal: 237 # We locally cache the prediction result ("signal") from another model. 238 self.cached_signals[flow_id] = self.gate_in_signal(payload) 239 elif actual_topic_in == self.topic_in_data or msg_in.topic_name() == self.topic_in_data: 240 self.cached_data[flow_id] = self.gate_in_data(payload) 241 else: 242 raise ValueError( 243 "The consumer's topic name does not match that of incoming message. The topic of incoming message is", 244 msg_in.topic_name()) 245 246 if self.log_path and os.path.isdir(self.log_path): 247 self._log_incoming_msg(msg_in_uuid, flow_id, msg_in) 248 249 is_performed, is_satisfied, msg_out_uuid, output = self._try_task(flow_id) 250 if is_performed: 251 output = self.gate_out_destination(output) 252 else: 253 output = self.gate_out_signal(output) 254 255 if output: 256 if self.log_path and os.path.isdir(self.log_path): 257 with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f: 258 pickle.dump(output, f) 259 if is_satisfied: 260 self.producer_destination.send(self.graph_codec.encode(msg_out_uuid, self.model_id, output, flow_id)) 261 elif self.topic_out_signal: 262 self.producer_signal.send(self.graph_codec.encode(msg_out_uuid, self.model_id, output, flow_id)) 263 else: 264 raise ValueError("The result is not satisfactory but output signal topic is not present.") 265 266 if self.acknowledge: 267 self.consumer.acknowledge(msg_in) 268 269 return output if output else None
class
Model:
17class Model: 18 """A model that takes in two topics and outputs to two topics. 19 For input topics, one is the original data stream and the other is an intermittent signal stream. 20 The original data stream is cached locally, waiting to be served. 21 Actual model serving is only performed when a request is received from the intermittent signal stream. 22 The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO; 23 otherwise, it sends the prediction to the intermittent signal stream of a more expensive model. 24 """ 25 26 def __init__(self, 27 task: Callable, 28 pulsar_node: str, 29 topic_in_data: str, 30 topic_out_destination: str, 31 topic_in_signal: Optional[str] = None, 32 topic_out_signal: Optional[str] = None, 33 gate_in_data: Optional[Callable] = None, 34 gate_out_destination: Optional[Callable] = None, 35 gate_in_signal: Optional[Callable] = None, 36 gate_out_signal: Optional[Callable] = None, 37 model_id: Optional[str] = 'model1', 38 header_size: Optional[int] = None, 39 ftp: Optional[bool] = False, 40 ftp_memory: Optional[bool] = False, 41 ftp_delete: Optional[bool] = False, 42 local_ftp_path: Optional[str] = '/srv/ftp/', 43 max_time_diff_ms: Optional[int] = 10 * 1000, 44 no_overlap: Optional[bool] = False, 45 min_interval_ms: Optional[int] = 0, 46 log_path: Optional[str] = None, 47 log_filename: Optional[str] = None, 48 acknowledge: Optional[bool] = False, 49 receiver_queue_size: Optional[int] = 10000) -> None: 50 """Initializes a model. 51 52 Args: 53 task: The actual task to be performed. 54 pulsar_node: Address of Apache Pulsar server. 55 topic_in_data: Pulsar topic of the input data stream. 56 topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present. 57 topic_out_destination: Pulsar topic of the output destination stream. 58 topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present. 59 gate_in_data: The gating function applied to input data stream (but not the signal stream). 60 gate_out_destination: The gating function applied to output prediction stream (to the destination topic). 61 gate_in_signal: The gating function applied to input signal stream. 62 gate_out_signal: The gating function applied to output signal stream. 63 model_id: The unique identifier of the model. 64 ftp: When set to `True`, lazy routing mode is enabled. 65 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 66 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 67 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 68 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 69 no_overlap: When set to `True`, we ensure that every message is at most processed once. 70 min_interval_ms: The minimum time interval between two consecutive runs. 71 log_path: Path to store the replay log. When set to `None`, log is disabled. 72 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 73 acknowledge: When set to `True`, the model acknowledges every message it receives. 74 receiver_queue_size: The size of the receiver queue for Pulsar. 75 """ 76 self.client = pulsar.Client(pulsar_node) 77 self.producer_destination = self.client.create_producer(topic_out_destination, 78 schema=pulsar.schema.BytesSchema()) 79 self.topic_out_signal = topic_out_signal 80 if topic_out_signal: 81 self.producer_signal = self.client.create_producer(topic_out_signal, 82 schema=pulsar.schema.BytesSchema()) 83 self.task = task 84 self.topic_in_data = topic_in_data 85 self.topic_in_signal = topic_in_signal 86 self.consumer = self._subscribe() 87 self.gate_in_data = (lambda x: x) if gate_in_data is None else gate_in_data 88 self.gate_out_destination = (lambda x: x) if gate_out_destination is None else gate_out_destination 89 self.gate_in_signal = (lambda x: x) if gate_in_signal is None else gate_in_signal 90 self.gate_out_signal = (lambda x: x) if gate_out_signal is None else gate_out_signal 91 self.model_id = model_id 92 self.ftp = ftp # consider changing this name to ftp_in 93 self.local_ftp_path = local_ftp_path 94 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 95 self.ftp_delete = ftp_delete 96 self.cached = dict() 97 self.latest_msg_uuid = dict() 98 self.latest_msg_publish_time_ms = dict() 99 self.latest_msg_consumed_time_ms = dict() 100 self.max_time_diff_ms = max_time_diff_ms 101 self.no_overlap = no_overlap 102 self.min_interval_ms = min_interval_ms # prediction frequency 103 self.last_run_start_ms = 0 104 self.log_path = log_path 105 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 106 self.last_log_duration_ms = -1 107 self.acknowledge = acknowledge 108 self.receiver_queue_size = receiver_queue_size 109 110 self.header_size = header_size if header_size else 32 111 self.graph_codec = GraphCodec(header_size=self.header_size) 112 self.cached_data = dict() 113 self.cached_signals = dict() 114 115 def _subscribe(self): 116 if self.topic_in_signal: 117 return self.client.subscribe([self.topic_in_data, self.topic_in_signal], 118 subscription_name='compute-sub', 119 consumer_type=ConsumerType.Shared, 120 schema=pulsar.schema.BytesSchema(), 121 initial_position=InitialPosition.Earliest, 122 receiver_queue_size=self.receiver_queue_size) 123 124 return self.client.subscribe(self.topic_in_data, subscription_name='compute-sub', 125 consumer_type=ConsumerType.Shared, schema=pulsar.schema.BytesSchema(), 126 initial_position=InitialPosition.Earliest, 127 receiver_queue_size=self.receiver_queue_size) 128 129 def __enter__(self): 130 return self 131 132 def __exit__(self, exc_type, exc_val, exc_tb): 133 self.client.close() 134 135 def _try_task(self, flow_id) -> Tuple[bool, Optional[bool], Optional[uuid.UUID], Optional[bytes]]: 136 """Performs actual computation. 137 138 Args: 139 flow_id: ID of the data flow to be processed. 140 141 Returns: 142 is_performed: Whether the job is actually performed. 143 is_satisfied: Whether the prediction is considered satisfied. If set to `True`, the result is sent to the 144 destination topic; if set to `False`, the result is sent to a more expensive model. 145 msg_out_uuid: The UUID of the output message. If set to `None`, no message is sent out. 146 output: The prediction result in bytes. 147 """ 148 # When `topic_in_signal` is not `None`, we only do actual computation when both data stream and signal stream 149 # from the same `flow_id` are present. 150 if self.topic_in_signal and (flow_id not in self.cached_data or flow_id not in self.cached_signals): 151 return False, None, None, None 152 153 # Avoid running too frequently for expensive tasks 154 if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms: 155 return False, None, None, None 156 157 self.last_run_start_ms = time.time() * 1000 158 159 # Lazy data routing: only fetch data from FTP counterpart when we actually need it. 160 if self.ftp: 161 if not self.cached_data[flow_id].startswith('ftp://'): 162 raise ValueError("FTP mode is enabled but incoming message does not have FTP format") 163 local_file_path = ftp_fetch(self.cached_data[flow_id], self.local_ftp_path, memory=self.ftp_memory, 164 delete=self.ftp_delete) 165 self.cached_data[flow_id] = local_file_path 166 167 if self.topic_in_signal: 168 is_satisfied, output = self.task(self.cached_data[flow_id], self.cached_signals[flow_id]) 169 else: 170 is_satisfied, output = self.task(self.cached_data[flow_id]) 171 last_run_finish_ms = time.time() * 1000 172 173 if self.ftp and not self.ftp_memory: 174 output = self._write_ftp(output, local_file_path, last_run_finish_ms) 175 176 msg_out_uuid = uuid.uuid4() 177 # If log_path is not None, we write aggregation decisions to a log file. 178 if self.log_path and os.path.isdir(self.log_path): 179 self._log_aggregation_decisions(flow_id, last_run_finish_ms, is_satisfied, msg_out_uuid, output) 180 181 self._clear_cache(flow_id) 182 return True, is_satisfied, msg_out_uuid, output 183 184 def _write_ftp(self, output, local_file_path, last_run_finish_ms): 185 # For now, use the completion timestamp as the filename of output FTP file 186 ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output') 187 pathlib.Path(ftp_output_dir).mkdir(exist_ok=True) 188 with open(os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp', 'w') as f: 189 f.write(output) 190 output = os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp' 191 return local_to_global_path(output, self.local_ftp_path) 192 193 def _clear_cache(self, flow_id): 194 del self.cached_data[flow_id] 195 if self.topic_in_signal: 196 del self.cached_signals[flow_id] 197 198 def _log_incoming_msg(self, msg_in_uuid, flow_id, msg_in): 199 self.latest_msg_consumed_time_ms[flow_id] = time.time() * 1000 200 if flow_id not in self.latest_msg_publish_time_ms: 201 self.latest_msg_publish_time_ms[flow_id] = [msg_in.publish_timestamp()] 202 self.latest_msg_uuid[flow_id] = [str(msg_in_uuid)] 203 else: 204 self.latest_msg_publish_time_ms[flow_id].append(msg_in.publish_timestamp()) 205 self.latest_msg_uuid[flow_id].append(str(msg_in_uuid)) 206 207 def _log_aggregation_decisions(self, flow_id, last_run_finish_ms, is_satisfied, msg_out_uuid, msg_out_payload): 208 replay_log = {'msg_in_uuid': self.latest_msg_uuid, 209 'msg_in_publish_time_ms': self.latest_msg_publish_time_ms, 210 'msg_in_consumed_time_ms': self.latest_msg_consumed_time_ms, 211 'start_compute_time_ms': self.last_run_start_ms, 212 'finish_compute_time_ms': last_run_finish_ms, 213 'last_log_duration_ms': self.last_log_duration_ms, 214 'data_in_payload': self.cached_data[flow_id], 215 'signal_in_payload': self.cached_signals[flow_id] if self.topic_in_signal else None, 216 'msg_out_is_satisfied': is_satisfied, 217 'msg_out_uuid': msg_out_uuid, 218 'msg_out_payload': msg_out_payload} 219 with open(os.path.join(self.log_path, self.log_filename + '.log'), 'ab') as f: 220 pickle.dump(replay_log, f) 221 self.last_log_duration_ms = time.time() * 1000 - last_run_finish_ms 222 223 # If no_overlap, reset latest_msg and latest_msg_time_ms so a message won't be processed twice. 224 if self.no_overlap: 225 self.latest_msg_uuid = dict() 226 self.latest_msg_publish_time_ms = dict() 227 self.latest_msg_consumed_time_ms = dict() 228 229 def __iter__(self): 230 return self 231 232 def __next__(self): 233 msg_in = self.consumer.receive() 234 msg_in_uuid, op_from, flow_id, payload = self.graph_codec.decode(msg_in.value()) 235 actual_topic_in = msg_in.topic_name().split('/')[-1] 236 237 if actual_topic_in == self.topic_in_signal or msg_in.topic_name() == self.topic_in_signal: 238 # We locally cache the prediction result ("signal") from another model. 239 self.cached_signals[flow_id] = self.gate_in_signal(payload) 240 elif actual_topic_in == self.topic_in_data or msg_in.topic_name() == self.topic_in_data: 241 self.cached_data[flow_id] = self.gate_in_data(payload) 242 else: 243 raise ValueError( 244 "The consumer's topic name does not match that of incoming message. The topic of incoming message is", 245 msg_in.topic_name()) 246 247 if self.log_path and os.path.isdir(self.log_path): 248 self._log_incoming_msg(msg_in_uuid, flow_id, msg_in) 249 250 is_performed, is_satisfied, msg_out_uuid, output = self._try_task(flow_id) 251 if is_performed: 252 output = self.gate_out_destination(output) 253 else: 254 output = self.gate_out_signal(output) 255 256 if output: 257 if self.log_path and os.path.isdir(self.log_path): 258 with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f: 259 pickle.dump(output, f) 260 if is_satisfied: 261 self.producer_destination.send(self.graph_codec.encode(msg_out_uuid, self.model_id, output, flow_id)) 262 elif self.topic_out_signal: 263 self.producer_signal.send(self.graph_codec.encode(msg_out_uuid, self.model_id, output, flow_id)) 264 else: 265 raise ValueError("The result is not satisfactory but output signal topic is not present.") 266 267 if self.acknowledge: 268 self.consumer.acknowledge(msg_in) 269 270 return output if output else None
A model that takes in two topics and outputs to two topics. For input topics, one is the original data stream and the other is an intermittent signal stream. The original data stream is cached locally, waiting to be served. Actual model serving is only performed when a request is received from the intermittent signal stream. The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO; otherwise, it sends the prediction to the intermittent signal stream of a more expensive model.
Model( task: Callable, pulsar_node: str, topic_in_data: str, topic_out_destination: str, topic_in_signal: Optional[str] = None, topic_out_signal: Optional[str] = None, gate_in_data: Optional[Callable] = None, gate_out_destination: Optional[Callable] = None, gate_in_signal: Optional[Callable] = None, gate_out_signal: Optional[Callable] = None, model_id: Optional[str] = 'model1', header_size: Optional[int] = None, ftp: Optional[bool] = False, ftp_memory: Optional[bool] = False, ftp_delete: Optional[bool] = False, local_ftp_path: Optional[str] = '/srv/ftp/', max_time_diff_ms: Optional[int] = 10000, no_overlap: Optional[bool] = False, min_interval_ms: Optional[int] = 0, log_path: Optional[str] = None, log_filename: Optional[str] = None, acknowledge: Optional[bool] = False, receiver_queue_size: Optional[int] = 10000)
26 def __init__(self, 27 task: Callable, 28 pulsar_node: str, 29 topic_in_data: str, 30 topic_out_destination: str, 31 topic_in_signal: Optional[str] = None, 32 topic_out_signal: Optional[str] = None, 33 gate_in_data: Optional[Callable] = None, 34 gate_out_destination: Optional[Callable] = None, 35 gate_in_signal: Optional[Callable] = None, 36 gate_out_signal: Optional[Callable] = None, 37 model_id: Optional[str] = 'model1', 38 header_size: Optional[int] = None, 39 ftp: Optional[bool] = False, 40 ftp_memory: Optional[bool] = False, 41 ftp_delete: Optional[bool] = False, 42 local_ftp_path: Optional[str] = '/srv/ftp/', 43 max_time_diff_ms: Optional[int] = 10 * 1000, 44 no_overlap: Optional[bool] = False, 45 min_interval_ms: Optional[int] = 0, 46 log_path: Optional[str] = None, 47 log_filename: Optional[str] = None, 48 acknowledge: Optional[bool] = False, 49 receiver_queue_size: Optional[int] = 10000) -> None: 50 """Initializes a model. 51 52 Args: 53 task: The actual task to be performed. 54 pulsar_node: Address of Apache Pulsar server. 55 topic_in_data: Pulsar topic of the input data stream. 56 topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present. 57 topic_out_destination: Pulsar topic of the output destination stream. 58 topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present. 59 gate_in_data: The gating function applied to input data stream (but not the signal stream). 60 gate_out_destination: The gating function applied to output prediction stream (to the destination topic). 61 gate_in_signal: The gating function applied to input signal stream. 62 gate_out_signal: The gating function applied to output signal stream. 63 model_id: The unique identifier of the model. 64 ftp: When set to `True`, lazy routing mode is enabled. 65 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 66 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 67 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 68 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 69 no_overlap: When set to `True`, we ensure that every message is at most processed once. 70 min_interval_ms: The minimum time interval between two consecutive runs. 71 log_path: Path to store the replay log. When set to `None`, log is disabled. 72 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 73 acknowledge: When set to `True`, the model acknowledges every message it receives. 74 receiver_queue_size: The size of the receiver queue for Pulsar. 75 """ 76 self.client = pulsar.Client(pulsar_node) 77 self.producer_destination = self.client.create_producer(topic_out_destination, 78 schema=pulsar.schema.BytesSchema()) 79 self.topic_out_signal = topic_out_signal 80 if topic_out_signal: 81 self.producer_signal = self.client.create_producer(topic_out_signal, 82 schema=pulsar.schema.BytesSchema()) 83 self.task = task 84 self.topic_in_data = topic_in_data 85 self.topic_in_signal = topic_in_signal 86 self.consumer = self._subscribe() 87 self.gate_in_data = (lambda x: x) if gate_in_data is None else gate_in_data 88 self.gate_out_destination = (lambda x: x) if gate_out_destination is None else gate_out_destination 89 self.gate_in_signal = (lambda x: x) if gate_in_signal is None else gate_in_signal 90 self.gate_out_signal = (lambda x: x) if gate_out_signal is None else gate_out_signal 91 self.model_id = model_id 92 self.ftp = ftp # consider changing this name to ftp_in 93 self.local_ftp_path = local_ftp_path 94 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 95 self.ftp_delete = ftp_delete 96 self.cached = dict() 97 self.latest_msg_uuid = dict() 98 self.latest_msg_publish_time_ms = dict() 99 self.latest_msg_consumed_time_ms = dict() 100 self.max_time_diff_ms = max_time_diff_ms 101 self.no_overlap = no_overlap 102 self.min_interval_ms = min_interval_ms # prediction frequency 103 self.last_run_start_ms = 0 104 self.log_path = log_path 105 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 106 self.last_log_duration_ms = -1 107 self.acknowledge = acknowledge 108 self.receiver_queue_size = receiver_queue_size 109 110 self.header_size = header_size if header_size else 32 111 self.graph_codec = GraphCodec(header_size=self.header_size) 112 self.cached_data = dict() 113 self.cached_signals = dict()
Initializes a model.
Arguments:
- task: The actual task to be performed.
- pulsar_node: Address of Apache Pulsar server.
- topic_in_data: Pulsar topic of the input data stream.
- topic_in_signal: Pulsar topic of the input signal stream. When set to
None
, no signal stream is present. - topic_out_destination: Pulsar topic of the output destination stream.
- topic_out_signal: Pulsar topic of the output signal stream. When set to
None
, no signal stream is present. - gate_in_data: The gating function applied to input data stream (but not the signal stream).
- gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
- gate_in_signal: The gating function applied to input signal stream.
- gate_out_signal: The gating function applied to output signal stream.
- model_id: The unique identifier of the model.
- ftp: When set to
True
, lazy routing mode is enabled. - ftp_memory: When set to
True
, in-memory lazy routing mode is enabled. Only effective whenftp=True
. - ftp_delete: When set to
True
, delete remote data after fetching is complete. Only effective whenftp=True
. - local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
- max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
- no_overlap: When set to
True
, we ensure that every message is at most processed once. - min_interval_ms: The minimum time interval between two consecutive runs.
- log_path: Path to store the replay log. When set to
None
, log is disabled. - log_filename: File name of replay log. When set to
None
, the current timestamp is used as file name. - acknowledge: When set to
True
, the model acknowledges every message it receives. - receiver_queue_size: The size of the receiver queue for Pulsar.