edgeserve.batch_model
1import os 2import time 3from typing import Callable, Optional, Tuple, Set 4 5import pulsar 6import pickle 7import pathlib 8from _pulsar import InitialPosition, ConsumerType 9 10from edgeserve.util import ftp_fetch, local_to_global_path 11from edgeserve.message_format import NetworkCodec 12from edgeserve.model import Model 13 14 15# This version intentionally drops schema support. Data sources cannot be combined adaptively. 16class BatchModel(Model): 17 """A model that takes in two topics and outputs to two topics. 18 For input topics, one is the original data stream and the other is an intermittent signal stream. 19 The original data stream is cached locally, waiting to be served. 20 Actual model serving is only performed when a request is received from the intermittent signal stream. 21 The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO; 22 otherwise, it sends the prediction to the intermittent signal stream of a more expensive model. 23 """ 24 25 def __init__(self, 26 task: Callable, 27 pulsar_node: str, 28 topic_in_data: str, 29 topic_out_destination: str, 30 topic_in_signal: Optional[str] = None, 31 topic_out_signal: Optional[str] = None, 32 gate_in_data: Optional[Callable] = None, 33 gate_out_destination: Optional[Callable] = None, 34 gate_in_signal: Optional[Callable] = None, 35 gate_out_signal: Optional[Callable] = None, 36 header_size: Optional[int] = None, 37 ftp: Optional[bool] = False, 38 ftp_memory: Optional[bool] = False, 39 ftp_delete: Optional[bool] = False, 40 local_ftp_path: Optional[str] = '/srv/ftp/', 41 max_time_diff_ms: Optional[int] = 10 * 1000, 42 no_overlap: Optional[bool] = False, 43 min_interval_ms: Optional[int] = 0, 44 log_path: Optional[str] = None, 45 log_filename: Optional[str] = None, 46 acknowledge: Optional[bool] = False, 47 batch_max_num_msg: Optional[int] = 100, 48 batch_max_bytes: Optional[int] = 10 * 1024 * 1024, 49 batch_timeout_ms: Optional[int] = 10) -> None: 50 """Initializes a model. 51 52 Args: 53 task: The actual task to be performed, in the format of a callable method. 54 Arg: 55 batch_input: a dict of (flow_id: int -> (data, signal)), or (flow_id -> data) if no signal present. 56 Returns: 57 is_satisfied: a dict of (flow_id: int -> bool) indicating whether each flow_id is satisfied. 58 output: a dict of (flow_id: int -> bytes) indicating model output for each flow_id. 59 pulsar_node: Address of Apache Pulsar server. 60 topic_in_data: Pulsar topic of the input data stream. 61 topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present. 62 topic_out_destination: Pulsar topic of the output destination stream. 63 topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present. 64 gate_in_data: The gating function applied to input data stream (but not the signal stream). 65 gate_out_destination: The gating function applied to output prediction stream (to the destination topic). 66 gate_in_signal: The gating function applied to input signal stream. 67 gate_out_signal: The gating function applied to output signal stream. 68 ftp: When set to `True`, lazy routing mode is enabled. 69 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 70 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 71 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 72 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 73 no_overlap: When set to `True`, we ensure that every message is at most processed once. 74 min_interval_ms: The minimum time interval between two consecutive runs. 75 log_path: Path to store the replay log. When set to `None`, log is disabled. 76 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 77 batch_max_num_msg: The maximum number of messages to include in a single batch. 78 batch_max_bytes: The maximum size of messages to include in a single batch. 79 batch_timeout_ms: The maximum timeout to wait (in ms) for enough messages for this batch. 80 """ 81 super().__init__(task, pulsar_node, topic_in_data, topic_out_destination, topic_in_signal, topic_out_signal, 82 gate_in_data, gate_out_destination, gate_in_signal, gate_out_signal, header_size, ftp, 83 ftp_memory, ftp_delete, local_ftp_path, max_time_diff_ms, no_overlap, min_interval_ms, 84 log_path, log_filename, acknowledge) 85 self.batch_max_num_msg = batch_max_num_msg 86 self.batch_max_bytes = batch_max_bytes 87 self.batch_timeout_ms = batch_timeout_ms 88 89 def _subscribe(self): 90 if self.topic_in_signal: 91 return self.client.subscribe([self.topic_in_data, self.topic_in_signal], 92 subscription_name='compute-sub', 93 consumer_type=ConsumerType.Shared, 94 schema=pulsar.schema.BytesSchema(), 95 initial_position=InitialPosition.Earliest, 96 batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(self.batch_max_num_msg, 97 self.batch_max_bytes, 98 self.batch_timeout_ms)) 99 100 return self.client.subscribe(self.topic_in_data, subscription_name='compute-sub', 101 consumer_type=ConsumerType.Shared, schema=pulsar.schema.BytesSchema(), 102 initial_position=InitialPosition.Earliest, 103 batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(self.batch_max_num_msg, 104 self.batch_max_bytes, 105 self.batch_timeout_ms)) 106 107 def __enter__(self): 108 return self 109 110 def __exit__(self, exc_type, exc_val, exc_tb): 111 self.client.close() 112 113 def _try_tasks(self, flow_ids: Set[int]) -> Tuple[bool, Optional[dict], Optional[dict]]: 114 """Performs actual computation. 115 116 Args: 117 flow_ids: Set of IDs of the data flows to be processed. 118 119 Returns: 120 is_performed: Whether the job is actually performed. 121 is_satisfied: Whether the prediction is considered satisfied for each flow_id in a (flow_id -> bool) dict. 122 If set to `True`, the result is sent to the destination topic; 123 if set to `False`, the result is sent to a more expensive model. 124 outputs: The prediction results in the format of dict of (flow_id -> bytes). 125 """ 126 # Avoid running too frequently for expensive tasks 127 if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms: 128 return False, None, None 129 130 self.last_run_start_ms = time.time() * 1000 131 132 # When `topic_in_signal` is present, we only do actual computation when both data stream and signal stream 133 # from the same `flow_id` are present. 134 if self.topic_in_signal: 135 flow_ids = flow_ids & self.cached_data.keys() & self.cached_signals.keys() 136 137 # Lazy data routing: only fetch data from FTP counterpart when we actually need it. 138 if self.ftp: 139 for flow_id in flow_ids: 140 if not self.cached_data[flow_id].startswith('ftp://'): 141 raise ValueError("FTP mode is enabled but incoming message does not have FTP format") 142 local_file_path = ftp_fetch(self.cached_data[flow_id], self.local_ftp_path, memory=self.ftp_memory, 143 delete=self.ftp_delete) 144 self.cached_data[flow_id] = local_file_path 145 146 cached_data_filtered = {flow_id: self.cached_data[flow_id] for flow_id in flow_ids} 147 if self.topic_in_signal: 148 cached_tuple_filtered = {flow_id: (self.cached_data[flow_id], 149 self.cached_signals[flow_id]) for flow_id in flow_ids} 150 is_satisfied, output = self.task(cached_tuple_filtered) 151 else: 152 is_satisfied, output = self.task(cached_data_filtered) 153 last_run_finish_ms = time.time() * 1000 154 155 if self.ftp and not self.ftp_memory: 156 output = self._write_ftp(output, local_file_path, last_run_finish_ms) 157 158 self._clear_cache(flow_ids) 159 return True, is_satisfied, output 160 161 def _clear_cache(self, flow_ids): 162 for flow_id in flow_ids: 163 del self.cached_data[flow_id] 164 if self.topic_in_signal: 165 del self.cached_signals[flow_id] 166 167 def _send_async_callback(res, msg_id): 168 print('Message published res=%s msg_id=%s', res, msg_id) 169 170 def __iter__(self): 171 return self 172 173 def __next__(self): 174 messages = self.consumer.batch_receive() 175 while len(messages) == 0: 176 messages = self.consumer.batch_receive() 177 178 flow_ids = set() 179 for msg in messages: 180 181 flow_id, payload = self.network_codec.decode(msg.data()) 182 actual_topic_in = msg.topic_name().split('/')[-1] 183 184 if actual_topic_in == self.topic_in_signal or msg.topic_name() == self.topic_in_signal: 185 # We locally cache the prediction result ("signal") from another model. 186 self.cached_signals[flow_id] = self.gate_in_signal(payload) 187 elif actual_topic_in == self.topic_in_data or msg.topic_name() == self.topic_in_data: 188 self.cached_data[flow_id] = self.gate_in_data(payload) 189 else: 190 raise ValueError( 191 "The consumer's topic name does not match that of incoming message. " 192 "The topic of incoming message is", msg.topic_name()) 193 194 flow_ids.add(flow_id) 195 196 if self.log_path and os.path.isdir(self.log_path): 197 self._log_incoming_msg(flow_id, msg) 198 199 self.latest_msg_consumed_time_ms[flow_id] = time.time() * 1000 200 201 is_performed, is_satisfied, outputs = self._try_tasks(flow_ids) 202 if not is_performed: 203 return None 204 205 do_return = False 206 for flow_id in outputs.keys(): 207 if not outputs[flow_id]: 208 continue 209 do_return = True 210 output = self.gate_out_destination(outputs[flow_id]) if is_satisfied[flow_id] else self.gate_out_signal( 211 outputs[flow_id]) 212 if self.log_path and os.path.isdir(self.log_path): 213 with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f: 214 pickle.dump(output, f) 215 if is_satisfied[flow_id]: 216 self.producer_destination.send_async(self.network_codec.encode([flow_id, output]), 217 self._send_async_callback) 218 elif self.topic_out_signal: 219 self.producer_signal.send_async(self.network_codec.encode([flow_id, output]), self._send_async_callback) 220 else: 221 raise ValueError("The result is not satisfactory but output signal topic is not present.") 222 223 if self.acknowledge: 224 for message in messages: 225 self.consumer.acknowledge(message) 226 227 return outputs if do_return else None
17class BatchModel(Model): 18 """A model that takes in two topics and outputs to two topics. 19 For input topics, one is the original data stream and the other is an intermittent signal stream. 20 The original data stream is cached locally, waiting to be served. 21 Actual model serving is only performed when a request is received from the intermittent signal stream. 22 The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO; 23 otherwise, it sends the prediction to the intermittent signal stream of a more expensive model. 24 """ 25 26 def __init__(self, 27 task: Callable, 28 pulsar_node: str, 29 topic_in_data: str, 30 topic_out_destination: str, 31 topic_in_signal: Optional[str] = None, 32 topic_out_signal: Optional[str] = None, 33 gate_in_data: Optional[Callable] = None, 34 gate_out_destination: Optional[Callable] = None, 35 gate_in_signal: Optional[Callable] = None, 36 gate_out_signal: Optional[Callable] = None, 37 header_size: Optional[int] = None, 38 ftp: Optional[bool] = False, 39 ftp_memory: Optional[bool] = False, 40 ftp_delete: Optional[bool] = False, 41 local_ftp_path: Optional[str] = '/srv/ftp/', 42 max_time_diff_ms: Optional[int] = 10 * 1000, 43 no_overlap: Optional[bool] = False, 44 min_interval_ms: Optional[int] = 0, 45 log_path: Optional[str] = None, 46 log_filename: Optional[str] = None, 47 acknowledge: Optional[bool] = False, 48 batch_max_num_msg: Optional[int] = 100, 49 batch_max_bytes: Optional[int] = 10 * 1024 * 1024, 50 batch_timeout_ms: Optional[int] = 10) -> None: 51 """Initializes a model. 52 53 Args: 54 task: The actual task to be performed, in the format of a callable method. 55 Arg: 56 batch_input: a dict of (flow_id: int -> (data, signal)), or (flow_id -> data) if no signal present. 57 Returns: 58 is_satisfied: a dict of (flow_id: int -> bool) indicating whether each flow_id is satisfied. 59 output: a dict of (flow_id: int -> bytes) indicating model output for each flow_id. 60 pulsar_node: Address of Apache Pulsar server. 61 topic_in_data: Pulsar topic of the input data stream. 62 topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present. 63 topic_out_destination: Pulsar topic of the output destination stream. 64 topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present. 65 gate_in_data: The gating function applied to input data stream (but not the signal stream). 66 gate_out_destination: The gating function applied to output prediction stream (to the destination topic). 67 gate_in_signal: The gating function applied to input signal stream. 68 gate_out_signal: The gating function applied to output signal stream. 69 ftp: When set to `True`, lazy routing mode is enabled. 70 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 71 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 72 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 73 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 74 no_overlap: When set to `True`, we ensure that every message is at most processed once. 75 min_interval_ms: The minimum time interval between two consecutive runs. 76 log_path: Path to store the replay log. When set to `None`, log is disabled. 77 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 78 batch_max_num_msg: The maximum number of messages to include in a single batch. 79 batch_max_bytes: The maximum size of messages to include in a single batch. 80 batch_timeout_ms: The maximum timeout to wait (in ms) for enough messages for this batch. 81 """ 82 super().__init__(task, pulsar_node, topic_in_data, topic_out_destination, topic_in_signal, topic_out_signal, 83 gate_in_data, gate_out_destination, gate_in_signal, gate_out_signal, header_size, ftp, 84 ftp_memory, ftp_delete, local_ftp_path, max_time_diff_ms, no_overlap, min_interval_ms, 85 log_path, log_filename, acknowledge) 86 self.batch_max_num_msg = batch_max_num_msg 87 self.batch_max_bytes = batch_max_bytes 88 self.batch_timeout_ms = batch_timeout_ms 89 90 def _subscribe(self): 91 if self.topic_in_signal: 92 return self.client.subscribe([self.topic_in_data, self.topic_in_signal], 93 subscription_name='compute-sub', 94 consumer_type=ConsumerType.Shared, 95 schema=pulsar.schema.BytesSchema(), 96 initial_position=InitialPosition.Earliest, 97 batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(self.batch_max_num_msg, 98 self.batch_max_bytes, 99 self.batch_timeout_ms)) 100 101 return self.client.subscribe(self.topic_in_data, subscription_name='compute-sub', 102 consumer_type=ConsumerType.Shared, schema=pulsar.schema.BytesSchema(), 103 initial_position=InitialPosition.Earliest, 104 batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(self.batch_max_num_msg, 105 self.batch_max_bytes, 106 self.batch_timeout_ms)) 107 108 def __enter__(self): 109 return self 110 111 def __exit__(self, exc_type, exc_val, exc_tb): 112 self.client.close() 113 114 def _try_tasks(self, flow_ids: Set[int]) -> Tuple[bool, Optional[dict], Optional[dict]]: 115 """Performs actual computation. 116 117 Args: 118 flow_ids: Set of IDs of the data flows to be processed. 119 120 Returns: 121 is_performed: Whether the job is actually performed. 122 is_satisfied: Whether the prediction is considered satisfied for each flow_id in a (flow_id -> bool) dict. 123 If set to `True`, the result is sent to the destination topic; 124 if set to `False`, the result is sent to a more expensive model. 125 outputs: The prediction results in the format of dict of (flow_id -> bytes). 126 """ 127 # Avoid running too frequently for expensive tasks 128 if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms: 129 return False, None, None 130 131 self.last_run_start_ms = time.time() * 1000 132 133 # When `topic_in_signal` is present, we only do actual computation when both data stream and signal stream 134 # from the same `flow_id` are present. 135 if self.topic_in_signal: 136 flow_ids = flow_ids & self.cached_data.keys() & self.cached_signals.keys() 137 138 # Lazy data routing: only fetch data from FTP counterpart when we actually need it. 139 if self.ftp: 140 for flow_id in flow_ids: 141 if not self.cached_data[flow_id].startswith('ftp://'): 142 raise ValueError("FTP mode is enabled but incoming message does not have FTP format") 143 local_file_path = ftp_fetch(self.cached_data[flow_id], self.local_ftp_path, memory=self.ftp_memory, 144 delete=self.ftp_delete) 145 self.cached_data[flow_id] = local_file_path 146 147 cached_data_filtered = {flow_id: self.cached_data[flow_id] for flow_id in flow_ids} 148 if self.topic_in_signal: 149 cached_tuple_filtered = {flow_id: (self.cached_data[flow_id], 150 self.cached_signals[flow_id]) for flow_id in flow_ids} 151 is_satisfied, output = self.task(cached_tuple_filtered) 152 else: 153 is_satisfied, output = self.task(cached_data_filtered) 154 last_run_finish_ms = time.time() * 1000 155 156 if self.ftp and not self.ftp_memory: 157 output = self._write_ftp(output, local_file_path, last_run_finish_ms) 158 159 self._clear_cache(flow_ids) 160 return True, is_satisfied, output 161 162 def _clear_cache(self, flow_ids): 163 for flow_id in flow_ids: 164 del self.cached_data[flow_id] 165 if self.topic_in_signal: 166 del self.cached_signals[flow_id] 167 168 def _send_async_callback(res, msg_id): 169 print('Message published res=%s msg_id=%s', res, msg_id) 170 171 def __iter__(self): 172 return self 173 174 def __next__(self): 175 messages = self.consumer.batch_receive() 176 while len(messages) == 0: 177 messages = self.consumer.batch_receive() 178 179 flow_ids = set() 180 for msg in messages: 181 182 flow_id, payload = self.network_codec.decode(msg.data()) 183 actual_topic_in = msg.topic_name().split('/')[-1] 184 185 if actual_topic_in == self.topic_in_signal or msg.topic_name() == self.topic_in_signal: 186 # We locally cache the prediction result ("signal") from another model. 187 self.cached_signals[flow_id] = self.gate_in_signal(payload) 188 elif actual_topic_in == self.topic_in_data or msg.topic_name() == self.topic_in_data: 189 self.cached_data[flow_id] = self.gate_in_data(payload) 190 else: 191 raise ValueError( 192 "The consumer's topic name does not match that of incoming message. " 193 "The topic of incoming message is", msg.topic_name()) 194 195 flow_ids.add(flow_id) 196 197 if self.log_path and os.path.isdir(self.log_path): 198 self._log_incoming_msg(flow_id, msg) 199 200 self.latest_msg_consumed_time_ms[flow_id] = time.time() * 1000 201 202 is_performed, is_satisfied, outputs = self._try_tasks(flow_ids) 203 if not is_performed: 204 return None 205 206 do_return = False 207 for flow_id in outputs.keys(): 208 if not outputs[flow_id]: 209 continue 210 do_return = True 211 output = self.gate_out_destination(outputs[flow_id]) if is_satisfied[flow_id] else self.gate_out_signal( 212 outputs[flow_id]) 213 if self.log_path and os.path.isdir(self.log_path): 214 with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f: 215 pickle.dump(output, f) 216 if is_satisfied[flow_id]: 217 self.producer_destination.send_async(self.network_codec.encode([flow_id, output]), 218 self._send_async_callback) 219 elif self.topic_out_signal: 220 self.producer_signal.send_async(self.network_codec.encode([flow_id, output]), self._send_async_callback) 221 else: 222 raise ValueError("The result is not satisfactory but output signal topic is not present.") 223 224 if self.acknowledge: 225 for message in messages: 226 self.consumer.acknowledge(message) 227 228 return outputs if do_return else None
A model that takes in two topics and outputs to two topics. For input topics, one is the original data stream and the other is an intermittent signal stream. The original data stream is cached locally, waiting to be served. Actual model serving is only performed when a request is received from the intermittent signal stream. The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO; otherwise, it sends the prediction to the intermittent signal stream of a more expensive model.
BatchModel( task: Callable, pulsar_node: str, topic_in_data: str, topic_out_destination: str, topic_in_signal: Optional[str] = None, topic_out_signal: Optional[str] = None, gate_in_data: Optional[Callable] = None, gate_out_destination: Optional[Callable] = None, gate_in_signal: Optional[Callable] = None, gate_out_signal: Optional[Callable] = None, header_size: Optional[int] = None, ftp: Optional[bool] = False, ftp_memory: Optional[bool] = False, ftp_delete: Optional[bool] = False, local_ftp_path: Optional[str] = '/srv/ftp/', max_time_diff_ms: Optional[int] = 10000, no_overlap: Optional[bool] = False, min_interval_ms: Optional[int] = 0, log_path: Optional[str] = None, log_filename: Optional[str] = None, acknowledge: Optional[bool] = False, batch_max_num_msg: Optional[int] = 100, batch_max_bytes: Optional[int] = 10485760, batch_timeout_ms: Optional[int] = 10)
26 def __init__(self, 27 task: Callable, 28 pulsar_node: str, 29 topic_in_data: str, 30 topic_out_destination: str, 31 topic_in_signal: Optional[str] = None, 32 topic_out_signal: Optional[str] = None, 33 gate_in_data: Optional[Callable] = None, 34 gate_out_destination: Optional[Callable] = None, 35 gate_in_signal: Optional[Callable] = None, 36 gate_out_signal: Optional[Callable] = None, 37 header_size: Optional[int] = None, 38 ftp: Optional[bool] = False, 39 ftp_memory: Optional[bool] = False, 40 ftp_delete: Optional[bool] = False, 41 local_ftp_path: Optional[str] = '/srv/ftp/', 42 max_time_diff_ms: Optional[int] = 10 * 1000, 43 no_overlap: Optional[bool] = False, 44 min_interval_ms: Optional[int] = 0, 45 log_path: Optional[str] = None, 46 log_filename: Optional[str] = None, 47 acknowledge: Optional[bool] = False, 48 batch_max_num_msg: Optional[int] = 100, 49 batch_max_bytes: Optional[int] = 10 * 1024 * 1024, 50 batch_timeout_ms: Optional[int] = 10) -> None: 51 """Initializes a model. 52 53 Args: 54 task: The actual task to be performed, in the format of a callable method. 55 Arg: 56 batch_input: a dict of (flow_id: int -> (data, signal)), or (flow_id -> data) if no signal present. 57 Returns: 58 is_satisfied: a dict of (flow_id: int -> bool) indicating whether each flow_id is satisfied. 59 output: a dict of (flow_id: int -> bytes) indicating model output for each flow_id. 60 pulsar_node: Address of Apache Pulsar server. 61 topic_in_data: Pulsar topic of the input data stream. 62 topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present. 63 topic_out_destination: Pulsar topic of the output destination stream. 64 topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present. 65 gate_in_data: The gating function applied to input data stream (but not the signal stream). 66 gate_out_destination: The gating function applied to output prediction stream (to the destination topic). 67 gate_in_signal: The gating function applied to input signal stream. 68 gate_out_signal: The gating function applied to output signal stream. 69 ftp: When set to `True`, lazy routing mode is enabled. 70 ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`. 71 ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`. 72 local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path. 73 max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together. 74 no_overlap: When set to `True`, we ensure that every message is at most processed once. 75 min_interval_ms: The minimum time interval between two consecutive runs. 76 log_path: Path to store the replay log. When set to `None`, log is disabled. 77 log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name. 78 batch_max_num_msg: The maximum number of messages to include in a single batch. 79 batch_max_bytes: The maximum size of messages to include in a single batch. 80 batch_timeout_ms: The maximum timeout to wait (in ms) for enough messages for this batch. 81 """ 82 super().__init__(task, pulsar_node, topic_in_data, topic_out_destination, topic_in_signal, topic_out_signal, 83 gate_in_data, gate_out_destination, gate_in_signal, gate_out_signal, header_size, ftp, 84 ftp_memory, ftp_delete, local_ftp_path, max_time_diff_ms, no_overlap, min_interval_ms, 85 log_path, log_filename, acknowledge) 86 self.batch_max_num_msg = batch_max_num_msg 87 self.batch_max_bytes = batch_max_bytes 88 self.batch_timeout_ms = batch_timeout_ms
Initializes a model.
Arguments:
- task: The actual task to be performed, in the format of a callable method. Arg: batch_input: a dict of (flow_id: int -> (data, signal)), or (flow_id -> data) if no signal present. Returns: is_satisfied: a dict of (flow_id: int -> bool) indicating whether each flow_id is satisfied. output: a dict of (flow_id: int -> bytes) indicating model output for each flow_id.
- pulsar_node: Address of Apache Pulsar server.
- topic_in_data: Pulsar topic of the input data stream.
- topic_in_signal: Pulsar topic of the input signal stream. When set to
None
, no signal stream is present. - topic_out_destination: Pulsar topic of the output destination stream.
- topic_out_signal: Pulsar topic of the output signal stream. When set to
None
, no signal stream is present. - gate_in_data: The gating function applied to input data stream (but not the signal stream).
- gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
- gate_in_signal: The gating function applied to input signal stream.
- gate_out_signal: The gating function applied to output signal stream.
- ftp: When set to
True
, lazy routing mode is enabled. - ftp_memory: When set to
True
, in-memory lazy routing mode is enabled. Only effective whenftp=True
. - ftp_delete: When set to
True
, delete remote data after fetching is complete. Only effective whenftp=True
. - local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
- max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
- no_overlap: When set to
True
, we ensure that every message is at most processed once. - min_interval_ms: The minimum time interval between two consecutive runs.
- log_path: Path to store the replay log. When set to
None
, log is disabled. - log_filename: File name of replay log. When set to
None
, the current timestamp is used as file name. - batch_max_num_msg: The maximum number of messages to include in a single batch.
- batch_max_bytes: The maximum size of messages to include in a single batch.
- batch_timeout_ms: The maximum timeout to wait (in ms) for enough messages for this batch.
Inherited Members
- edgeserve.model.Model
- client
- producer_destination
- topic_out_signal
- task
- topic_in_data
- topic_in_signal
- consumer
- gate_in_data
- gate_out_destination
- gate_in_signal
- gate_out_signal
- model_id
- ftp
- local_ftp_path
- ftp_memory
- ftp_delete
- cached
- latest_msg_uuid
- latest_msg_publish_time_ms
- latest_msg_consumed_time_ms
- max_time_diff_ms
- no_overlap
- min_interval_ms
- last_run_start_ms
- log_path
- log_filename
- last_log_duration_ms
- acknowledge
- receiver_queue_size
- header_size
- graph_codec
- cached_data
- cached_signals