edgeserve.batch_model

  1import os
  2import time
  3from typing import Callable, Optional, Tuple, Set
  4
  5import pulsar
  6import pickle
  7import pathlib
  8from _pulsar import InitialPosition, ConsumerType
  9
 10from edgeserve.util import ftp_fetch, local_to_global_path
 11from edgeserve.message_format import NetworkCodec
 12from edgeserve.model import Model
 13
 14
 15# This version intentionally drops schema support. Data sources cannot be combined adaptively.
 16class BatchModel(Model):
 17    """A model that takes in two topics and outputs to two topics.
 18    For input topics, one is the original data stream and the other is an intermittent signal stream.
 19    The original data stream is cached locally, waiting to be served.
 20    Actual model serving is only performed when a request is received from the intermittent signal stream.
 21    The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO;
 22    otherwise, it sends the prediction to the intermittent signal stream of a more expensive model.
 23    """
 24
 25    def __init__(self,
 26                 task: Callable,
 27                 pulsar_node: str,
 28                 topic_in_data: str,
 29                 topic_out_destination: str,
 30                 topic_in_signal: Optional[str] = None,
 31                 topic_out_signal: Optional[str] = None,
 32                 gate_in_data: Optional[Callable] = None,
 33                 gate_out_destination: Optional[Callable] = None,
 34                 gate_in_signal: Optional[Callable] = None,
 35                 gate_out_signal: Optional[Callable] = None,
 36                 header_size: Optional[int] = None,
 37                 ftp: Optional[bool] = False,
 38                 ftp_memory: Optional[bool] = False,
 39                 ftp_delete: Optional[bool] = False,
 40                 local_ftp_path: Optional[str] = '/srv/ftp/',
 41                 max_time_diff_ms: Optional[int] = 10 * 1000,
 42                 no_overlap: Optional[bool] = False,
 43                 min_interval_ms: Optional[int] = 0,
 44                 log_path: Optional[str] = None,
 45                 log_filename: Optional[str] = None,
 46                 acknowledge: Optional[bool] = False,
 47                 batch_max_num_msg: Optional[int] = 100,
 48                 batch_max_bytes: Optional[int] = 10 * 1024 * 1024,
 49                 batch_timeout_ms: Optional[int] = 10) -> None:
 50        """Initializes a model.
 51
 52        Args:
 53            task: The actual task to be performed, in the format of a callable method.
 54                Arg:
 55                    batch_input: a dict of (flow_id: int -> (data, signal)), or (flow_id -> data) if no signal present.
 56                Returns:
 57                    is_satisfied: a dict of (flow_id: int -> bool) indicating whether each flow_id is satisfied.
 58                    output: a dict of (flow_id: int -> bytes) indicating model output for each flow_id.
 59            pulsar_node: Address of Apache Pulsar server.
 60            topic_in_data: Pulsar topic of the input data stream.
 61            topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present.
 62            topic_out_destination: Pulsar topic of the output destination stream.
 63            topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present.
 64            gate_in_data: The gating function applied to input data stream (but not the signal stream).
 65            gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
 66            gate_in_signal: The gating function applied to input signal stream.
 67            gate_out_signal: The gating function applied to output signal stream.
 68            ftp: When set to `True`, lazy routing mode is enabled.
 69            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
 70            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
 71            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
 72            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
 73            no_overlap: When set to `True`, we ensure that every message is at most processed once.
 74            min_interval_ms: The minimum time interval between two consecutive runs.
 75            log_path: Path to store the replay log. When set to `None`, log is disabled.
 76            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
 77            batch_max_num_msg: The maximum number of messages to include in a single batch.
 78            batch_max_bytes: The maximum size of messages to include in a single batch.
 79            batch_timeout_ms: The maximum timeout to wait (in ms) for enough messages for this batch.
 80        """
 81        super().__init__(task, pulsar_node, topic_in_data, topic_out_destination, topic_in_signal, topic_out_signal,
 82                         gate_in_data, gate_out_destination, gate_in_signal, gate_out_signal, header_size, ftp,
 83                         ftp_memory, ftp_delete, local_ftp_path, max_time_diff_ms, no_overlap, min_interval_ms,
 84                         log_path, log_filename, acknowledge)
 85        self.batch_max_num_msg = batch_max_num_msg
 86        self.batch_max_bytes = batch_max_bytes
 87        self.batch_timeout_ms = batch_timeout_ms
 88
 89    def _subscribe(self):
 90        if self.topic_in_signal:
 91            return self.client.subscribe([self.topic_in_data, self.topic_in_signal],
 92                                         subscription_name='compute-sub',
 93                                         consumer_type=ConsumerType.Shared,
 94                                         schema=pulsar.schema.BytesSchema(),
 95                                         initial_position=InitialPosition.Earliest,
 96                                         batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(self.batch_max_num_msg,
 97                                                                                                self.batch_max_bytes,
 98                                                                                                self.batch_timeout_ms))
 99
100        return self.client.subscribe(self.topic_in_data, subscription_name='compute-sub',
101                                     consumer_type=ConsumerType.Shared, schema=pulsar.schema.BytesSchema(),
102                                     initial_position=InitialPosition.Earliest,
103                                     batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(self.batch_max_num_msg,
104                                                                                            self.batch_max_bytes,
105                                                                                            self.batch_timeout_ms))
106
107    def __enter__(self):
108        return self
109
110    def __exit__(self, exc_type, exc_val, exc_tb):
111        self.client.close()
112
113    def _try_tasks(self, flow_ids: Set[int]) -> Tuple[bool, Optional[dict], Optional[dict]]:
114        """Performs actual computation.
115
116        Args:
117            flow_ids: Set of IDs of the data flows to be processed.
118
119        Returns:
120            is_performed: Whether the job is actually performed.
121            is_satisfied: Whether the prediction is considered satisfied for each flow_id in a (flow_id -> bool) dict.
122                          If set to `True`, the result is sent to the destination topic;
123                          if set to `False`, the result is sent to a more expensive model.
124            outputs: The prediction results in the format of dict of (flow_id -> bytes).
125        """
126        # Avoid running too frequently for expensive tasks
127        if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms:
128            return False, None, None
129
130        self.last_run_start_ms = time.time() * 1000
131
132        # When `topic_in_signal` is present, we only do actual computation when both data stream and signal stream
133        # from the same `flow_id` are present.
134        if self.topic_in_signal:
135            flow_ids = flow_ids & self.cached_data.keys() & self.cached_signals.keys()
136
137        # Lazy data routing: only fetch data from FTP counterpart when we actually need it.
138        if self.ftp:
139            for flow_id in flow_ids:
140                if not self.cached_data[flow_id].startswith('ftp://'):
141                    raise ValueError("FTP mode is enabled but incoming message does not have FTP format")
142                local_file_path = ftp_fetch(self.cached_data[flow_id], self.local_ftp_path, memory=self.ftp_memory,
143                                            delete=self.ftp_delete)
144                self.cached_data[flow_id] = local_file_path
145
146        cached_data_filtered = {flow_id: self.cached_data[flow_id] for flow_id in flow_ids}
147        if self.topic_in_signal:
148            cached_tuple_filtered = {flow_id: (self.cached_data[flow_id],
149                                               self.cached_signals[flow_id]) for flow_id in flow_ids}
150            is_satisfied, output = self.task(cached_tuple_filtered)
151        else:
152            is_satisfied, output = self.task(cached_data_filtered)
153        last_run_finish_ms = time.time() * 1000
154
155        if self.ftp and not self.ftp_memory:
156            output = self._write_ftp(output, local_file_path, last_run_finish_ms)
157
158        self._clear_cache(flow_ids)
159        return True, is_satisfied, output
160
161    def _clear_cache(self, flow_ids):
162        for flow_id in flow_ids:
163            del self.cached_data[flow_id]
164            if self.topic_in_signal:
165                del self.cached_signals[flow_id]
166
167    def _send_async_callback(res, msg_id):
168        print('Message published res=%s msg_id=%s', res, msg_id)
169
170    def __iter__(self):
171        return self
172
173    def __next__(self):
174        messages = self.consumer.batch_receive()
175        while len(messages) == 0:
176            messages = self.consumer.batch_receive()
177
178        flow_ids = set()
179        for msg in messages:
180
181            flow_id, payload = self.network_codec.decode(msg.data())
182            actual_topic_in = msg.topic_name().split('/')[-1]
183
184            if actual_topic_in == self.topic_in_signal or msg.topic_name() == self.topic_in_signal:
185                # We locally cache the prediction result ("signal") from another model.
186                self.cached_signals[flow_id] = self.gate_in_signal(payload)
187            elif actual_topic_in == self.topic_in_data or msg.topic_name() == self.topic_in_data:
188                self.cached_data[flow_id] = self.gate_in_data(payload)
189            else:
190                raise ValueError(
191                    "The consumer's topic name does not match that of incoming message. "
192                    "The topic of incoming message is", msg.topic_name())
193
194            flow_ids.add(flow_id)
195
196            if self.log_path and os.path.isdir(self.log_path):
197                self._log_incoming_msg(flow_id, msg)
198
199            self.latest_msg_consumed_time_ms[flow_id] = time.time() * 1000
200
201        is_performed, is_satisfied, outputs = self._try_tasks(flow_ids)
202        if not is_performed:
203            return None
204
205        do_return = False
206        for flow_id in outputs.keys():
207            if not outputs[flow_id]:
208                continue
209            do_return = True
210            output = self.gate_out_destination(outputs[flow_id]) if is_satisfied[flow_id] else self.gate_out_signal(
211                outputs[flow_id])
212            if self.log_path and os.path.isdir(self.log_path):
213                with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f:
214                    pickle.dump(output, f)
215            if is_satisfied[flow_id]:
216                self.producer_destination.send_async(self.network_codec.encode([flow_id, output]),
217                                                     self._send_async_callback)
218            elif self.topic_out_signal:
219                self.producer_signal.send_async(self.network_codec.encode([flow_id, output]), self._send_async_callback)
220            else:
221                raise ValueError("The result is not satisfactory but output signal topic is not present.")
222
223        if self.acknowledge:
224            for message in messages:
225                self.consumer.acknowledge(message)
226
227        return outputs if do_return else None
class BatchModel(edgeserve.model.Model):
 17class BatchModel(Model):
 18    """A model that takes in two topics and outputs to two topics.
 19    For input topics, one is the original data stream and the other is an intermittent signal stream.
 20    The original data stream is cached locally, waiting to be served.
 21    Actual model serving is only performed when a request is received from the intermittent signal stream.
 22    The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO;
 23    otherwise, it sends the prediction to the intermittent signal stream of a more expensive model.
 24    """
 25
 26    def __init__(self,
 27                 task: Callable,
 28                 pulsar_node: str,
 29                 topic_in_data: str,
 30                 topic_out_destination: str,
 31                 topic_in_signal: Optional[str] = None,
 32                 topic_out_signal: Optional[str] = None,
 33                 gate_in_data: Optional[Callable] = None,
 34                 gate_out_destination: Optional[Callable] = None,
 35                 gate_in_signal: Optional[Callable] = None,
 36                 gate_out_signal: Optional[Callable] = None,
 37                 header_size: Optional[int] = None,
 38                 ftp: Optional[bool] = False,
 39                 ftp_memory: Optional[bool] = False,
 40                 ftp_delete: Optional[bool] = False,
 41                 local_ftp_path: Optional[str] = '/srv/ftp/',
 42                 max_time_diff_ms: Optional[int] = 10 * 1000,
 43                 no_overlap: Optional[bool] = False,
 44                 min_interval_ms: Optional[int] = 0,
 45                 log_path: Optional[str] = None,
 46                 log_filename: Optional[str] = None,
 47                 acknowledge: Optional[bool] = False,
 48                 batch_max_num_msg: Optional[int] = 100,
 49                 batch_max_bytes: Optional[int] = 10 * 1024 * 1024,
 50                 batch_timeout_ms: Optional[int] = 10) -> None:
 51        """Initializes a model.
 52
 53        Args:
 54            task: The actual task to be performed, in the format of a callable method.
 55                Arg:
 56                    batch_input: a dict of (flow_id: int -> (data, signal)), or (flow_id -> data) if no signal present.
 57                Returns:
 58                    is_satisfied: a dict of (flow_id: int -> bool) indicating whether each flow_id is satisfied.
 59                    output: a dict of (flow_id: int -> bytes) indicating model output for each flow_id.
 60            pulsar_node: Address of Apache Pulsar server.
 61            topic_in_data: Pulsar topic of the input data stream.
 62            topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present.
 63            topic_out_destination: Pulsar topic of the output destination stream.
 64            topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present.
 65            gate_in_data: The gating function applied to input data stream (but not the signal stream).
 66            gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
 67            gate_in_signal: The gating function applied to input signal stream.
 68            gate_out_signal: The gating function applied to output signal stream.
 69            ftp: When set to `True`, lazy routing mode is enabled.
 70            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
 71            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
 72            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
 73            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
 74            no_overlap: When set to `True`, we ensure that every message is at most processed once.
 75            min_interval_ms: The minimum time interval between two consecutive runs.
 76            log_path: Path to store the replay log. When set to `None`, log is disabled.
 77            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
 78            batch_max_num_msg: The maximum number of messages to include in a single batch.
 79            batch_max_bytes: The maximum size of messages to include in a single batch.
 80            batch_timeout_ms: The maximum timeout to wait (in ms) for enough messages for this batch.
 81        """
 82        super().__init__(task, pulsar_node, topic_in_data, topic_out_destination, topic_in_signal, topic_out_signal,
 83                         gate_in_data, gate_out_destination, gate_in_signal, gate_out_signal, header_size, ftp,
 84                         ftp_memory, ftp_delete, local_ftp_path, max_time_diff_ms, no_overlap, min_interval_ms,
 85                         log_path, log_filename, acknowledge)
 86        self.batch_max_num_msg = batch_max_num_msg
 87        self.batch_max_bytes = batch_max_bytes
 88        self.batch_timeout_ms = batch_timeout_ms
 89
 90    def _subscribe(self):
 91        if self.topic_in_signal:
 92            return self.client.subscribe([self.topic_in_data, self.topic_in_signal],
 93                                         subscription_name='compute-sub',
 94                                         consumer_type=ConsumerType.Shared,
 95                                         schema=pulsar.schema.BytesSchema(),
 96                                         initial_position=InitialPosition.Earliest,
 97                                         batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(self.batch_max_num_msg,
 98                                                                                                self.batch_max_bytes,
 99                                                                                                self.batch_timeout_ms))
100
101        return self.client.subscribe(self.topic_in_data, subscription_name='compute-sub',
102                                     consumer_type=ConsumerType.Shared, schema=pulsar.schema.BytesSchema(),
103                                     initial_position=InitialPosition.Earliest,
104                                     batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(self.batch_max_num_msg,
105                                                                                            self.batch_max_bytes,
106                                                                                            self.batch_timeout_ms))
107
108    def __enter__(self):
109        return self
110
111    def __exit__(self, exc_type, exc_val, exc_tb):
112        self.client.close()
113
114    def _try_tasks(self, flow_ids: Set[int]) -> Tuple[bool, Optional[dict], Optional[dict]]:
115        """Performs actual computation.
116
117        Args:
118            flow_ids: Set of IDs of the data flows to be processed.
119
120        Returns:
121            is_performed: Whether the job is actually performed.
122            is_satisfied: Whether the prediction is considered satisfied for each flow_id in a (flow_id -> bool) dict.
123                          If set to `True`, the result is sent to the destination topic;
124                          if set to `False`, the result is sent to a more expensive model.
125            outputs: The prediction results in the format of dict of (flow_id -> bytes).
126        """
127        # Avoid running too frequently for expensive tasks
128        if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms:
129            return False, None, None
130
131        self.last_run_start_ms = time.time() * 1000
132
133        # When `topic_in_signal` is present, we only do actual computation when both data stream and signal stream
134        # from the same `flow_id` are present.
135        if self.topic_in_signal:
136            flow_ids = flow_ids & self.cached_data.keys() & self.cached_signals.keys()
137
138        # Lazy data routing: only fetch data from FTP counterpart when we actually need it.
139        if self.ftp:
140            for flow_id in flow_ids:
141                if not self.cached_data[flow_id].startswith('ftp://'):
142                    raise ValueError("FTP mode is enabled but incoming message does not have FTP format")
143                local_file_path = ftp_fetch(self.cached_data[flow_id], self.local_ftp_path, memory=self.ftp_memory,
144                                            delete=self.ftp_delete)
145                self.cached_data[flow_id] = local_file_path
146
147        cached_data_filtered = {flow_id: self.cached_data[flow_id] for flow_id in flow_ids}
148        if self.topic_in_signal:
149            cached_tuple_filtered = {flow_id: (self.cached_data[flow_id],
150                                               self.cached_signals[flow_id]) for flow_id in flow_ids}
151            is_satisfied, output = self.task(cached_tuple_filtered)
152        else:
153            is_satisfied, output = self.task(cached_data_filtered)
154        last_run_finish_ms = time.time() * 1000
155
156        if self.ftp and not self.ftp_memory:
157            output = self._write_ftp(output, local_file_path, last_run_finish_ms)
158
159        self._clear_cache(flow_ids)
160        return True, is_satisfied, output
161
162    def _clear_cache(self, flow_ids):
163        for flow_id in flow_ids:
164            del self.cached_data[flow_id]
165            if self.topic_in_signal:
166                del self.cached_signals[flow_id]
167
168    def _send_async_callback(res, msg_id):
169        print('Message published res=%s msg_id=%s', res, msg_id)
170
171    def __iter__(self):
172        return self
173
174    def __next__(self):
175        messages = self.consumer.batch_receive()
176        while len(messages) == 0:
177            messages = self.consumer.batch_receive()
178
179        flow_ids = set()
180        for msg in messages:
181
182            flow_id, payload = self.network_codec.decode(msg.data())
183            actual_topic_in = msg.topic_name().split('/')[-1]
184
185            if actual_topic_in == self.topic_in_signal or msg.topic_name() == self.topic_in_signal:
186                # We locally cache the prediction result ("signal") from another model.
187                self.cached_signals[flow_id] = self.gate_in_signal(payload)
188            elif actual_topic_in == self.topic_in_data or msg.topic_name() == self.topic_in_data:
189                self.cached_data[flow_id] = self.gate_in_data(payload)
190            else:
191                raise ValueError(
192                    "The consumer's topic name does not match that of incoming message. "
193                    "The topic of incoming message is", msg.topic_name())
194
195            flow_ids.add(flow_id)
196
197            if self.log_path and os.path.isdir(self.log_path):
198                self._log_incoming_msg(flow_id, msg)
199
200            self.latest_msg_consumed_time_ms[flow_id] = time.time() * 1000
201
202        is_performed, is_satisfied, outputs = self._try_tasks(flow_ids)
203        if not is_performed:
204            return None
205
206        do_return = False
207        for flow_id in outputs.keys():
208            if not outputs[flow_id]:
209                continue
210            do_return = True
211            output = self.gate_out_destination(outputs[flow_id]) if is_satisfied[flow_id] else self.gate_out_signal(
212                outputs[flow_id])
213            if self.log_path and os.path.isdir(self.log_path):
214                with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f:
215                    pickle.dump(output, f)
216            if is_satisfied[flow_id]:
217                self.producer_destination.send_async(self.network_codec.encode([flow_id, output]),
218                                                     self._send_async_callback)
219            elif self.topic_out_signal:
220                self.producer_signal.send_async(self.network_codec.encode([flow_id, output]), self._send_async_callback)
221            else:
222                raise ValueError("The result is not satisfactory but output signal topic is not present.")
223
224        if self.acknowledge:
225            for message in messages:
226                self.consumer.acknowledge(message)
227
228        return outputs if do_return else None

A model that takes in two topics and outputs to two topics. For input topics, one is the original data stream and the other is an intermittent signal stream. The original data stream is cached locally, waiting to be served. Actual model serving is only performed when a request is received from the intermittent signal stream. The model may choose to send its prediction to the destination topic, if it satisfies a user-defined SLO; otherwise, it sends the prediction to the intermittent signal stream of a more expensive model.

BatchModel( task: Callable, pulsar_node: str, topic_in_data: str, topic_out_destination: str, topic_in_signal: Optional[str] = None, topic_out_signal: Optional[str] = None, gate_in_data: Optional[Callable] = None, gate_out_destination: Optional[Callable] = None, gate_in_signal: Optional[Callable] = None, gate_out_signal: Optional[Callable] = None, header_size: Optional[int] = None, ftp: Optional[bool] = False, ftp_memory: Optional[bool] = False, ftp_delete: Optional[bool] = False, local_ftp_path: Optional[str] = '/srv/ftp/', max_time_diff_ms: Optional[int] = 10000, no_overlap: Optional[bool] = False, min_interval_ms: Optional[int] = 0, log_path: Optional[str] = None, log_filename: Optional[str] = None, acknowledge: Optional[bool] = False, batch_max_num_msg: Optional[int] = 100, batch_max_bytes: Optional[int] = 10485760, batch_timeout_ms: Optional[int] = 10)
26    def __init__(self,
27                 task: Callable,
28                 pulsar_node: str,
29                 topic_in_data: str,
30                 topic_out_destination: str,
31                 topic_in_signal: Optional[str] = None,
32                 topic_out_signal: Optional[str] = None,
33                 gate_in_data: Optional[Callable] = None,
34                 gate_out_destination: Optional[Callable] = None,
35                 gate_in_signal: Optional[Callable] = None,
36                 gate_out_signal: Optional[Callable] = None,
37                 header_size: Optional[int] = None,
38                 ftp: Optional[bool] = False,
39                 ftp_memory: Optional[bool] = False,
40                 ftp_delete: Optional[bool] = False,
41                 local_ftp_path: Optional[str] = '/srv/ftp/',
42                 max_time_diff_ms: Optional[int] = 10 * 1000,
43                 no_overlap: Optional[bool] = False,
44                 min_interval_ms: Optional[int] = 0,
45                 log_path: Optional[str] = None,
46                 log_filename: Optional[str] = None,
47                 acknowledge: Optional[bool] = False,
48                 batch_max_num_msg: Optional[int] = 100,
49                 batch_max_bytes: Optional[int] = 10 * 1024 * 1024,
50                 batch_timeout_ms: Optional[int] = 10) -> None:
51        """Initializes a model.
52
53        Args:
54            task: The actual task to be performed, in the format of a callable method.
55                Arg:
56                    batch_input: a dict of (flow_id: int -> (data, signal)), or (flow_id -> data) if no signal present.
57                Returns:
58                    is_satisfied: a dict of (flow_id: int -> bool) indicating whether each flow_id is satisfied.
59                    output: a dict of (flow_id: int -> bytes) indicating model output for each flow_id.
60            pulsar_node: Address of Apache Pulsar server.
61            topic_in_data: Pulsar topic of the input data stream.
62            topic_in_signal: Pulsar topic of the input signal stream. When set to `None`, no signal stream is present.
63            topic_out_destination: Pulsar topic of the output destination stream.
64            topic_out_signal: Pulsar topic of the output signal stream. When set to `None`, no signal stream is present.
65            gate_in_data: The gating function applied to input data stream (but not the signal stream).
66            gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
67            gate_in_signal: The gating function applied to input signal stream.
68            gate_out_signal: The gating function applied to output signal stream.
69            ftp: When set to `True`, lazy routing mode is enabled.
70            ftp_memory: When set to `True`, in-memory lazy routing mode is enabled. Only effective when `ftp=True`.
71            ftp_delete: When set to `True`, delete remote data after fetching is complete. Only effective when `ftp=True`.
72            local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
73            max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
74            no_overlap: When set to `True`, we ensure that every message is at most processed once.
75            min_interval_ms: The minimum time interval between two consecutive runs.
76            log_path: Path to store the replay log. When set to `None`, log is disabled.
77            log_filename: File name of replay log. When set to `None`, the current timestamp is used as file name.
78            batch_max_num_msg: The maximum number of messages to include in a single batch.
79            batch_max_bytes: The maximum size of messages to include in a single batch.
80            batch_timeout_ms: The maximum timeout to wait (in ms) for enough messages for this batch.
81        """
82        super().__init__(task, pulsar_node, topic_in_data, topic_out_destination, topic_in_signal, topic_out_signal,
83                         gate_in_data, gate_out_destination, gate_in_signal, gate_out_signal, header_size, ftp,
84                         ftp_memory, ftp_delete, local_ftp_path, max_time_diff_ms, no_overlap, min_interval_ms,
85                         log_path, log_filename, acknowledge)
86        self.batch_max_num_msg = batch_max_num_msg
87        self.batch_max_bytes = batch_max_bytes
88        self.batch_timeout_ms = batch_timeout_ms

Initializes a model.

Arguments:
  • task: The actual task to be performed, in the format of a callable method. Arg: batch_input: a dict of (flow_id: int -> (data, signal)), or (flow_id -> data) if no signal present. Returns: is_satisfied: a dict of (flow_id: int -> bool) indicating whether each flow_id is satisfied. output: a dict of (flow_id: int -> bytes) indicating model output for each flow_id.
  • pulsar_node: Address of Apache Pulsar server.
  • topic_in_data: Pulsar topic of the input data stream.
  • topic_in_signal: Pulsar topic of the input signal stream. When set to None, no signal stream is present.
  • topic_out_destination: Pulsar topic of the output destination stream.
  • topic_out_signal: Pulsar topic of the output signal stream. When set to None, no signal stream is present.
  • gate_in_data: The gating function applied to input data stream (but not the signal stream).
  • gate_out_destination: The gating function applied to output prediction stream (to the destination topic).
  • gate_in_signal: The gating function applied to input signal stream.
  • gate_out_signal: The gating function applied to output signal stream.
  • ftp: When set to True, lazy routing mode is enabled.
  • ftp_memory: When set to True, in-memory lazy routing mode is enabled. Only effective when ftp=True.
  • ftp_delete: When set to True, delete remote data after fetching is complete. Only effective when ftp=True.
  • local_ftp_path: The local FTP path served by an active FTP server. Other nodes fetch data from this path.
  • max_time_diff_ms: The maximum timestamp difference we tolerate between data sources to aggregate together.
  • no_overlap: When set to True, we ensure that every message is at most processed once.
  • min_interval_ms: The minimum time interval between two consecutive runs.
  • log_path: Path to store the replay log. When set to None, log is disabled.
  • log_filename: File name of replay log. When set to None, the current timestamp is used as file name.
  • batch_max_num_msg: The maximum number of messages to include in a single batch.
  • batch_max_bytes: The maximum size of messages to include in a single batch.
  • batch_timeout_ms: The maximum timeout to wait (in ms) for enough messages for this batch.
batch_max_num_msg
batch_max_bytes
batch_timeout_ms