edgeserve.batch_compute

  1import os
  2import time
  3import pulsar
  4import pickle
  5import pathlib
  6from _pulsar import InitialPosition, ConsumerType
  7from inspect import signature
  8
  9from edgeserve.util import ftp_fetch, local_to_global_path
 10
 11
 12# This version intentionally drops schema support. Data sources cannot be combined adaptively.
 13class BatchCompute:
 14    def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False,
 15                 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst',
 16                 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None,
 17                 acknowledge=False, batch_max_num_msg=100, batch_max_bytes=10 * 1024 * 1024, batch_timeout_ms=10):
 18        self.client = pulsar.Client(pulsar_node)
 19        self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema())
 20        self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub',
 21                                              consumer_type=ConsumerType.Shared,
 22                                              schema=pulsar.schema.BytesSchema(),
 23                                              initial_position=InitialPosition.Earliest,
 24                                              batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(batch_max_num_msg,
 25                                                                                                     batch_max_bytes,
 26                                                                                                     batch_timeout_ms))
 27        self.task = task
 28        self.worker_id = worker_id
 29        self.gate_in = (lambda x: x) if gate_in is None else gate_in
 30        self.gate_out = (lambda x: x) if gate_out is None else gate_out
 31        self.ftp = ftp  # consider changing this name to ftp_in
 32        self.local_ftp_path = local_ftp_path
 33        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
 34        self.ftp_delete = ftp_delete
 35        self.max_time_diff_ms = max_time_diff_ms
 36        self.no_overlap = no_overlap
 37        self.min_interval_ms = min_interval_ms  # prediction frequency
 38        self.last_run_start_ms = 0
 39        self.log_path = log_path
 40        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
 41        self.last_log_duration_ms = -1
 42        self.acknowledge = acknowledge
 43
 44    def __enter__(self):
 45        return self
 46
 47    def __exit__(self, exc_type, exc_val, exc_tb):
 48        self.client.close()
 49
 50    def _try_task(self, msgs):
 51        # Avoid running too frequently for expensive tasks
 52        if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms:
 53            return False, None
 54
 55        self.last_run_start_ms = time.time() * 1000
 56
 57        # Lazy data routing: only fetch data from FTP counterpart when we actually need it.
 58        if self.ftp:
 59            for i, msg in enumerate(msgs):
 60                if not msg.startswith('ftp://'):
 61                    raise ValueError("FTP mode is enabled but incoming message does not have FTP format")
 62                local_file_path = ftp_fetch(msg, self.local_ftp_path, memory=self.ftp_memory, delete=self.ftp_delete)
 63                msgs[i] = local_file_path
 64
 65        output = self.task(msgs)
 66        last_run_finish_ms = time.time() * 1000
 67
 68        # Filename: for now, use the completion timestamp as the filename of output FTP file
 69        # Batching: for now, we batch the output of messages received in batch in case of lazy data routing.
 70        if self.ftp and not self.ftp_memory:
 71            ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output')
 72            pathlib.Path(ftp_output_dir).mkdir(exist_ok=True)
 73            with open(os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp', 'w') as f:
 74                f.write(output)
 75            output = os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp'
 76            output = local_to_global_path(output, self.local_ftp_path)
 77
 78        return True, output
 79
 80    def __iter__(self):
 81        return self
 82
 83    def __next__(self):
 84        messages = self.consumer.batch_receive()
 85        while len(messages) == 0:
 86            messages = self.consumer.batch_receive()
 87        msgs = [self.gate_in(msg.data()) for msg in messages]
 88
 89        ret, output = self._try_task(msgs)
 90        if ret:
 91            output = self.gate_out(output)
 92
 93        if output:
 94            if self.log_path and os.path.isdir(self.log_path):
 95                with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f:
 96                    pickle.dump(output, f)
 97            self.producer.send(output)
 98
 99        if self.acknowledge:
100            for message in messages:
101                self.consumer.acknowledge(message)
102
103        return output if output else None
class BatchCompute:
 14class BatchCompute:
 15    def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False,
 16                 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst',
 17                 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None,
 18                 acknowledge=False, batch_max_num_msg=100, batch_max_bytes=10 * 1024 * 1024, batch_timeout_ms=10):
 19        self.client = pulsar.Client(pulsar_node)
 20        self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema())
 21        self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub',
 22                                              consumer_type=ConsumerType.Shared,
 23                                              schema=pulsar.schema.BytesSchema(),
 24                                              initial_position=InitialPosition.Earliest,
 25                                              batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(batch_max_num_msg,
 26                                                                                                     batch_max_bytes,
 27                                                                                                     batch_timeout_ms))
 28        self.task = task
 29        self.worker_id = worker_id
 30        self.gate_in = (lambda x: x) if gate_in is None else gate_in
 31        self.gate_out = (lambda x: x) if gate_out is None else gate_out
 32        self.ftp = ftp  # consider changing this name to ftp_in
 33        self.local_ftp_path = local_ftp_path
 34        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
 35        self.ftp_delete = ftp_delete
 36        self.max_time_diff_ms = max_time_diff_ms
 37        self.no_overlap = no_overlap
 38        self.min_interval_ms = min_interval_ms  # prediction frequency
 39        self.last_run_start_ms = 0
 40        self.log_path = log_path
 41        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
 42        self.last_log_duration_ms = -1
 43        self.acknowledge = acknowledge
 44
 45    def __enter__(self):
 46        return self
 47
 48    def __exit__(self, exc_type, exc_val, exc_tb):
 49        self.client.close()
 50
 51    def _try_task(self, msgs):
 52        # Avoid running too frequently for expensive tasks
 53        if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms:
 54            return False, None
 55
 56        self.last_run_start_ms = time.time() * 1000
 57
 58        # Lazy data routing: only fetch data from FTP counterpart when we actually need it.
 59        if self.ftp:
 60            for i, msg in enumerate(msgs):
 61                if not msg.startswith('ftp://'):
 62                    raise ValueError("FTP mode is enabled but incoming message does not have FTP format")
 63                local_file_path = ftp_fetch(msg, self.local_ftp_path, memory=self.ftp_memory, delete=self.ftp_delete)
 64                msgs[i] = local_file_path
 65
 66        output = self.task(msgs)
 67        last_run_finish_ms = time.time() * 1000
 68
 69        # Filename: for now, use the completion timestamp as the filename of output FTP file
 70        # Batching: for now, we batch the output of messages received in batch in case of lazy data routing.
 71        if self.ftp and not self.ftp_memory:
 72            ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output')
 73            pathlib.Path(ftp_output_dir).mkdir(exist_ok=True)
 74            with open(os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp', 'w') as f:
 75                f.write(output)
 76            output = os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp'
 77            output = local_to_global_path(output, self.local_ftp_path)
 78
 79        return True, output
 80
 81    def __iter__(self):
 82        return self
 83
 84    def __next__(self):
 85        messages = self.consumer.batch_receive()
 86        while len(messages) == 0:
 87            messages = self.consumer.batch_receive()
 88        msgs = [self.gate_in(msg.data()) for msg in messages]
 89
 90        ret, output = self._try_task(msgs)
 91        if ret:
 92            output = self.gate_out(output)
 93
 94        if output:
 95            if self.log_path and os.path.isdir(self.log_path):
 96                with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f:
 97                    pickle.dump(output, f)
 98            self.producer.send(output)
 99
100        if self.acknowledge:
101            for message in messages:
102                self.consumer.acknowledge(message)
103
104        return output if output else None
BatchCompute( task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', max_time_diff_ms=10000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, acknowledge=False, batch_max_num_msg=100, batch_max_bytes=10485760, batch_timeout_ms=10)
15    def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False,
16                 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst',
17                 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None,
18                 acknowledge=False, batch_max_num_msg=100, batch_max_bytes=10 * 1024 * 1024, batch_timeout_ms=10):
19        self.client = pulsar.Client(pulsar_node)
20        self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema())
21        self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub',
22                                              consumer_type=ConsumerType.Shared,
23                                              schema=pulsar.schema.BytesSchema(),
24                                              initial_position=InitialPosition.Earliest,
25                                              batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(batch_max_num_msg,
26                                                                                                     batch_max_bytes,
27                                                                                                     batch_timeout_ms))
28        self.task = task
29        self.worker_id = worker_id
30        self.gate_in = (lambda x: x) if gate_in is None else gate_in
31        self.gate_out = (lambda x: x) if gate_out is None else gate_out
32        self.ftp = ftp  # consider changing this name to ftp_in
33        self.local_ftp_path = local_ftp_path
34        self.ftp_memory = ftp_memory  # consider changing this name to (negate) ftp_out
35        self.ftp_delete = ftp_delete
36        self.max_time_diff_ms = max_time_diff_ms
37        self.no_overlap = no_overlap
38        self.min_interval_ms = min_interval_ms  # prediction frequency
39        self.last_run_start_ms = 0
40        self.log_path = log_path
41        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
42        self.last_log_duration_ms = -1
43        self.acknowledge = acknowledge
client
producer
consumer
task
worker_id
gate_in
gate_out
ftp
local_ftp_path
ftp_memory
ftp_delete
max_time_diff_ms
no_overlap
min_interval_ms
last_run_start_ms
log_path
log_filename
last_log_duration_ms
acknowledge