edgeserve.batch_compute
1import os 2import time 3import pulsar 4import pickle 5import pathlib 6from _pulsar import InitialPosition, ConsumerType 7from inspect import signature 8 9from edgeserve.util import ftp_fetch, local_to_global_path 10 11 12# This version intentionally drops schema support. Data sources cannot be combined adaptively. 13class BatchCompute: 14 def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, 15 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', 16 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, 17 acknowledge=False, batch_max_num_msg=100, batch_max_bytes=10 * 1024 * 1024, batch_timeout_ms=10): 18 self.client = pulsar.Client(pulsar_node) 19 self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema()) 20 self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub', 21 consumer_type=ConsumerType.Shared, 22 schema=pulsar.schema.BytesSchema(), 23 initial_position=InitialPosition.Earliest, 24 batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(batch_max_num_msg, 25 batch_max_bytes, 26 batch_timeout_ms)) 27 self.task = task 28 self.worker_id = worker_id 29 self.gate_in = (lambda x: x) if gate_in is None else gate_in 30 self.gate_out = (lambda x: x) if gate_out is None else gate_out 31 self.ftp = ftp # consider changing this name to ftp_in 32 self.local_ftp_path = local_ftp_path 33 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 34 self.ftp_delete = ftp_delete 35 self.max_time_diff_ms = max_time_diff_ms 36 self.no_overlap = no_overlap 37 self.min_interval_ms = min_interval_ms # prediction frequency 38 self.last_run_start_ms = 0 39 self.log_path = log_path 40 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 41 self.last_log_duration_ms = -1 42 self.acknowledge = acknowledge 43 44 def __enter__(self): 45 return self 46 47 def __exit__(self, exc_type, exc_val, exc_tb): 48 self.client.close() 49 50 def _try_task(self, msgs): 51 # Avoid running too frequently for expensive tasks 52 if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms: 53 return False, None 54 55 self.last_run_start_ms = time.time() * 1000 56 57 # Lazy data routing: only fetch data from FTP counterpart when we actually need it. 58 if self.ftp: 59 for i, msg in enumerate(msgs): 60 if not msg.startswith('ftp://'): 61 raise ValueError("FTP mode is enabled but incoming message does not have FTP format") 62 local_file_path = ftp_fetch(msg, self.local_ftp_path, memory=self.ftp_memory, delete=self.ftp_delete) 63 msgs[i] = local_file_path 64 65 output = self.task(msgs) 66 last_run_finish_ms = time.time() * 1000 67 68 # Filename: for now, use the completion timestamp as the filename of output FTP file 69 # Batching: for now, we batch the output of messages received in batch in case of lazy data routing. 70 if self.ftp and not self.ftp_memory: 71 ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output') 72 pathlib.Path(ftp_output_dir).mkdir(exist_ok=True) 73 with open(os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp', 'w') as f: 74 f.write(output) 75 output = os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp' 76 output = local_to_global_path(output, self.local_ftp_path) 77 78 return True, output 79 80 def __iter__(self): 81 return self 82 83 def __next__(self): 84 messages = self.consumer.batch_receive() 85 while len(messages) == 0: 86 messages = self.consumer.batch_receive() 87 msgs = [self.gate_in(msg.data()) for msg in messages] 88 89 ret, output = self._try_task(msgs) 90 if ret: 91 output = self.gate_out(output) 92 93 if output: 94 if self.log_path and os.path.isdir(self.log_path): 95 with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f: 96 pickle.dump(output, f) 97 self.producer.send(output) 98 99 if self.acknowledge: 100 for message in messages: 101 self.consumer.acknowledge(message) 102 103 return output if output else None
class
BatchCompute:
14class BatchCompute: 15 def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, 16 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', 17 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, 18 acknowledge=False, batch_max_num_msg=100, batch_max_bytes=10 * 1024 * 1024, batch_timeout_ms=10): 19 self.client = pulsar.Client(pulsar_node) 20 self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema()) 21 self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub', 22 consumer_type=ConsumerType.Shared, 23 schema=pulsar.schema.BytesSchema(), 24 initial_position=InitialPosition.Earliest, 25 batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(batch_max_num_msg, 26 batch_max_bytes, 27 batch_timeout_ms)) 28 self.task = task 29 self.worker_id = worker_id 30 self.gate_in = (lambda x: x) if gate_in is None else gate_in 31 self.gate_out = (lambda x: x) if gate_out is None else gate_out 32 self.ftp = ftp # consider changing this name to ftp_in 33 self.local_ftp_path = local_ftp_path 34 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 35 self.ftp_delete = ftp_delete 36 self.max_time_diff_ms = max_time_diff_ms 37 self.no_overlap = no_overlap 38 self.min_interval_ms = min_interval_ms # prediction frequency 39 self.last_run_start_ms = 0 40 self.log_path = log_path 41 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 42 self.last_log_duration_ms = -1 43 self.acknowledge = acknowledge 44 45 def __enter__(self): 46 return self 47 48 def __exit__(self, exc_type, exc_val, exc_tb): 49 self.client.close() 50 51 def _try_task(self, msgs): 52 # Avoid running too frequently for expensive tasks 53 if time.time() * 1000 < self.last_run_start_ms + self.min_interval_ms: 54 return False, None 55 56 self.last_run_start_ms = time.time() * 1000 57 58 # Lazy data routing: only fetch data from FTP counterpart when we actually need it. 59 if self.ftp: 60 for i, msg in enumerate(msgs): 61 if not msg.startswith('ftp://'): 62 raise ValueError("FTP mode is enabled but incoming message does not have FTP format") 63 local_file_path = ftp_fetch(msg, self.local_ftp_path, memory=self.ftp_memory, delete=self.ftp_delete) 64 msgs[i] = local_file_path 65 66 output = self.task(msgs) 67 last_run_finish_ms = time.time() * 1000 68 69 # Filename: for now, use the completion timestamp as the filename of output FTP file 70 # Batching: for now, we batch the output of messages received in batch in case of lazy data routing. 71 if self.ftp and not self.ftp_memory: 72 ftp_output_dir = os.path.join(os.path.dirname(local_file_path), 'ftp_output') 73 pathlib.Path(ftp_output_dir).mkdir(exist_ok=True) 74 with open(os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp', 'w') as f: 75 f.write(output) 76 output = os.path.join(ftp_output_dir, str(last_run_finish_ms)) + '.ftp' 77 output = local_to_global_path(output, self.local_ftp_path) 78 79 return True, output 80 81 def __iter__(self): 82 return self 83 84 def __next__(self): 85 messages = self.consumer.batch_receive() 86 while len(messages) == 0: 87 messages = self.consumer.batch_receive() 88 msgs = [self.gate_in(msg.data()) for msg in messages] 89 90 ret, output = self._try_task(msgs) 91 if ret: 92 output = self.gate_out(output) 93 94 if output: 95 if self.log_path and os.path.isdir(self.log_path): 96 with open(os.path.join(self.log_path, self.log_filename + '.output'), 'ab') as f: 97 pickle.dump(output, f) 98 self.producer.send(output) 99 100 if self.acknowledge: 101 for message in messages: 102 self.consumer.acknowledge(message) 103 104 return output if output else None
BatchCompute( task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', max_time_diff_ms=10000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, acknowledge=False, batch_max_num_msg=100, batch_max_bytes=10485760, batch_timeout_ms=10)
15 def __init__(self, task, pulsar_node, worker_id='worker1', gate_in=None, gate_out=None, ftp=False, ftp_memory=False, 16 ftp_delete=False, local_ftp_path='/srv/ftp/', topic_in='src', topic_out='dst', 17 max_time_diff_ms=10 * 1000, no_overlap=False, min_interval_ms=0, log_path=None, log_filename=None, 18 acknowledge=False, batch_max_num_msg=100, batch_max_bytes=10 * 1024 * 1024, batch_timeout_ms=10): 19 self.client = pulsar.Client(pulsar_node) 20 self.producer = self.client.create_producer(topic_out, schema=pulsar.schema.BytesSchema()) 21 self.consumer = self.client.subscribe(topic_in, subscription_name='compute-sub', 22 consumer_type=ConsumerType.Shared, 23 schema=pulsar.schema.BytesSchema(), 24 initial_position=InitialPosition.Earliest, 25 batch_receive_policy=pulsar.ConsumerBatchReceivePolicy(batch_max_num_msg, 26 batch_max_bytes, 27 batch_timeout_ms)) 28 self.task = task 29 self.worker_id = worker_id 30 self.gate_in = (lambda x: x) if gate_in is None else gate_in 31 self.gate_out = (lambda x: x) if gate_out is None else gate_out 32 self.ftp = ftp # consider changing this name to ftp_in 33 self.local_ftp_path = local_ftp_path 34 self.ftp_memory = ftp_memory # consider changing this name to (negate) ftp_out 35 self.ftp_delete = ftp_delete 36 self.max_time_diff_ms = max_time_diff_ms 37 self.no_overlap = no_overlap 38 self.min_interval_ms = min_interval_ms # prediction frequency 39 self.last_run_start_ms = 0 40 self.log_path = log_path 41 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 42 self.last_log_duration_ms = -1 43 self.acknowledge = acknowledge