edgeserve.data_source
1import pulsar 2from pulsar.schema import AvroSchema 3import time 4import os 5import pickle 6import uuid 7 8from edgeserve.message_format import GraphCodec 9 10 11class DataSource: 12 def __init__(self, stream, pulsar_node, source_id, gate=None, topic='src', log_path=None, log_filename=None): 13 self.client = pulsar.Client(pulsar_node) 14 self.producer = self.client.create_producer(topic, schema=pulsar.schema.BytesSchema()) 15 self.stream = iter(stream) 16 self.gate = (lambda x: x) if gate is None else gate 17 assert len(source_id) <= 16, 'source_id must be at most 16 bytes long' 18 self.source_id = source_id 19 self.log_path = log_path 20 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 21 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0) 22 23 def __enter__(self): 24 return self 25 26 def __exit__(self, exc_type, exc_val, exc_tb): 27 self.client.close() 28 29 def __iter__(self): 30 return self 31 32 def __next__(self): 33 incoming = next(self.stream) 34 if incoming is None: 35 return None 36 data = self.gate(incoming) 37 if data is None: 38 return None 39 data_collection_time_ms = time.time() * 1000 40 41 msg_uuid = uuid.uuid4() 42 message = self.graph_codec.encode(msg_uuid=msg_uuid, op_from=self.source_id, payload=data) 43 self.producer.send(message) 44 msg_sent_time_ms = time.time() * 1000 45 46 # If log_path is not None, we write timestamps to a log file. 47 if self.log_path and os.path.isdir(self.log_path): 48 replay_log = {'msg_uuid': msg_uuid, 49 'data_collection_time_ms': data_collection_time_ms, 50 'msg_sent_time_ms': msg_sent_time_ms} 51 with open(os.path.join(self.log_path, self.log_filename + '.datasource'), 'ab') as f: 52 pickle.dump(replay_log, f) 53 54 return data 55 56 57class CodeSource(DataSource): 58 def __init__(self, stream, pulsar_node, gate=lambda x: x.encode('utf-8'), topic='src'): 59 super().__init__(stream, pulsar_node, 'code-source', gate, topic) 60 61 62class CameraSource(DataSource): 63 # local_ftp_path: str, e.g. '/srv/ftp/files/' 64 # remote_ftp_path: str, e.g. 'ftp://192.168.1.101/files/' 65 def __init__(self, pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, 66 gate=None, topic='src', cam_id=0): 67 super().__init__(self.stream(), pulsar_node, source_id, gate, topic) 68 self.cam_id = cam_id 69 self.local_ftp_path = local_ftp_path 70 self.global_ftp_path = global_ftp_path 71 self.width = width 72 self.height = height 73 74 def stream(self): 75 import cv2 76 from time import time 77 cap = cv2.VideoCapture(self.cam_id) 78 cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) 79 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) 80 while cap.isOpened(): 81 ret, frame = cap.read() 82 if ret is True: 83 cur_time = str(time()) 84 cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame) 85 yield self.global_ftp_path + '/' + cur_time + '.jpg' 86 else: 87 break 88 cap.release() 89 cv2.destroyAllWindows() 90 91 92# read in a csv file with first column as timestamp in seconds. 93class SimulateTimeSeries(DataSource): 94 def __init__(self, filename, pulsar_node, source_id, gate=None, topic='src'): 95 super().__init__(self.stream(), pulsar_node, source_id, gate, topic) 96 self.timestamps = [] 97 self.values = [] 98 self._csv_to_list(filename) 99 100 def _csv_to_list(self, filename): 101 import csv 102 with open(filename, 'r') as f: 103 reader = csv.reader(f) 104 for row in reader: 105 self.timestamps.append(row[0]) 106 self.values.append(row[1:]) 107 108 def stream(self): 109 if len(self.timestamps) == 0: 110 raise StopIteration 111 112 import time 113 time_diff = time.time() - float(self.timestamps[0]) 114 for i in range(len(self.timestamps)): 115 this_time_diff = time.time() - float(self.timestamps[i]) 116 if this_time_diff < time_diff: 117 time.sleep(time_diff - this_time_diff) 118 time_diff = this_time_diff 119 yield self.values[i] 120 121 122class SimulateVideoWithTimestamps(CameraSource): 123 def __init__(self, timestamps, pulsar_node, local_ftp_path, global_ftp_path, width, height, 124 source_id, gate=None, topic='src', cam_id=0): 125 super().__init__(pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate, topic, cam_id) 126 self.timestamps = timestamps 127 128 def stream(self): 129 if len(self.timestamps) == 0: 130 raise StopIteration 131 132 import cv2 133 from time import time 134 cap = cv2.VideoCapture(self.cam_id) 135 cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) 136 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) 137 fps = cap.get(cv2.CAP_PROP_FPS) 138 i = 0 139 140 while cap.isOpened(): 141 ret, frame = cap.read() 142 for _ in range(int(fps * self.timestamps[i])): 143 ret, frame = cap.read() # skip frames 144 i += 1 145 if ret is True: 146 cur_time = str(time()) 147 cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame) 148 yield self.global_ftp_path + '/' + cur_time + '.jpg' 149 else: 150 break 151 cap.release() 152 cv2.destroyAllWindows()
class
DataSource:
12class DataSource: 13 def __init__(self, stream, pulsar_node, source_id, gate=None, topic='src', log_path=None, log_filename=None): 14 self.client = pulsar.Client(pulsar_node) 15 self.producer = self.client.create_producer(topic, schema=pulsar.schema.BytesSchema()) 16 self.stream = iter(stream) 17 self.gate = (lambda x: x) if gate is None else gate 18 assert len(source_id) <= 16, 'source_id must be at most 16 bytes long' 19 self.source_id = source_id 20 self.log_path = log_path 21 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 22 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0) 23 24 def __enter__(self): 25 return self 26 27 def __exit__(self, exc_type, exc_val, exc_tb): 28 self.client.close() 29 30 def __iter__(self): 31 return self 32 33 def __next__(self): 34 incoming = next(self.stream) 35 if incoming is None: 36 return None 37 data = self.gate(incoming) 38 if data is None: 39 return None 40 data_collection_time_ms = time.time() * 1000 41 42 msg_uuid = uuid.uuid4() 43 message = self.graph_codec.encode(msg_uuid=msg_uuid, op_from=self.source_id, payload=data) 44 self.producer.send(message) 45 msg_sent_time_ms = time.time() * 1000 46 47 # If log_path is not None, we write timestamps to a log file. 48 if self.log_path and os.path.isdir(self.log_path): 49 replay_log = {'msg_uuid': msg_uuid, 50 'data_collection_time_ms': data_collection_time_ms, 51 'msg_sent_time_ms': msg_sent_time_ms} 52 with open(os.path.join(self.log_path, self.log_filename + '.datasource'), 'ab') as f: 53 pickle.dump(replay_log, f) 54 55 return data
DataSource( stream, pulsar_node, source_id, gate=None, topic='src', log_path=None, log_filename=None)
13 def __init__(self, stream, pulsar_node, source_id, gate=None, topic='src', log_path=None, log_filename=None): 14 self.client = pulsar.Client(pulsar_node) 15 self.producer = self.client.create_producer(topic, schema=pulsar.schema.BytesSchema()) 16 self.stream = iter(stream) 17 self.gate = (lambda x: x) if gate is None else gate 18 assert len(source_id) <= 16, 'source_id must be at most 16 bytes long' 19 self.source_id = source_id 20 self.log_path = log_path 21 self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename 22 self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
58class CodeSource(DataSource): 59 def __init__(self, stream, pulsar_node, gate=lambda x: x.encode('utf-8'), topic='src'): 60 super().__init__(stream, pulsar_node, 'code-source', gate, topic)
Inherited Members
63class CameraSource(DataSource): 64 # local_ftp_path: str, e.g. '/srv/ftp/files/' 65 # remote_ftp_path: str, e.g. 'ftp://192.168.1.101/files/' 66 def __init__(self, pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, 67 gate=None, topic='src', cam_id=0): 68 super().__init__(self.stream(), pulsar_node, source_id, gate, topic) 69 self.cam_id = cam_id 70 self.local_ftp_path = local_ftp_path 71 self.global_ftp_path = global_ftp_path 72 self.width = width 73 self.height = height 74 75 def stream(self): 76 import cv2 77 from time import time 78 cap = cv2.VideoCapture(self.cam_id) 79 cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) 80 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) 81 while cap.isOpened(): 82 ret, frame = cap.read() 83 if ret is True: 84 cur_time = str(time()) 85 cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame) 86 yield self.global_ftp_path + '/' + cur_time + '.jpg' 87 else: 88 break 89 cap.release() 90 cv2.destroyAllWindows()
CameraSource( pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate=None, topic='src', cam_id=0)
66 def __init__(self, pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, 67 gate=None, topic='src', cam_id=0): 68 super().__init__(self.stream(), pulsar_node, source_id, gate, topic) 69 self.cam_id = cam_id 70 self.local_ftp_path = local_ftp_path 71 self.global_ftp_path = global_ftp_path 72 self.width = width 73 self.height = height
def
stream(self):
75 def stream(self): 76 import cv2 77 from time import time 78 cap = cv2.VideoCapture(self.cam_id) 79 cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) 80 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) 81 while cap.isOpened(): 82 ret, frame = cap.read() 83 if ret is True: 84 cur_time = str(time()) 85 cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame) 86 yield self.global_ftp_path + '/' + cur_time + '.jpg' 87 else: 88 break 89 cap.release() 90 cv2.destroyAllWindows()
Inherited Members
94class SimulateTimeSeries(DataSource): 95 def __init__(self, filename, pulsar_node, source_id, gate=None, topic='src'): 96 super().__init__(self.stream(), pulsar_node, source_id, gate, topic) 97 self.timestamps = [] 98 self.values = [] 99 self._csv_to_list(filename) 100 101 def _csv_to_list(self, filename): 102 import csv 103 with open(filename, 'r') as f: 104 reader = csv.reader(f) 105 for row in reader: 106 self.timestamps.append(row[0]) 107 self.values.append(row[1:]) 108 109 def stream(self): 110 if len(self.timestamps) == 0: 111 raise StopIteration 112 113 import time 114 time_diff = time.time() - float(self.timestamps[0]) 115 for i in range(len(self.timestamps)): 116 this_time_diff = time.time() - float(self.timestamps[i]) 117 if this_time_diff < time_diff: 118 time.sleep(time_diff - this_time_diff) 119 time_diff = this_time_diff 120 yield self.values[i]
def
stream(self):
109 def stream(self): 110 if len(self.timestamps) == 0: 111 raise StopIteration 112 113 import time 114 time_diff = time.time() - float(self.timestamps[0]) 115 for i in range(len(self.timestamps)): 116 this_time_diff = time.time() - float(self.timestamps[i]) 117 if this_time_diff < time_diff: 118 time.sleep(time_diff - this_time_diff) 119 time_diff = this_time_diff 120 yield self.values[i]
Inherited Members
123class SimulateVideoWithTimestamps(CameraSource): 124 def __init__(self, timestamps, pulsar_node, local_ftp_path, global_ftp_path, width, height, 125 source_id, gate=None, topic='src', cam_id=0): 126 super().__init__(pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate, topic, cam_id) 127 self.timestamps = timestamps 128 129 def stream(self): 130 if len(self.timestamps) == 0: 131 raise StopIteration 132 133 import cv2 134 from time import time 135 cap = cv2.VideoCapture(self.cam_id) 136 cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) 137 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) 138 fps = cap.get(cv2.CAP_PROP_FPS) 139 i = 0 140 141 while cap.isOpened(): 142 ret, frame = cap.read() 143 for _ in range(int(fps * self.timestamps[i])): 144 ret, frame = cap.read() # skip frames 145 i += 1 146 if ret is True: 147 cur_time = str(time()) 148 cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame) 149 yield self.global_ftp_path + '/' + cur_time + '.jpg' 150 else: 151 break 152 cap.release() 153 cv2.destroyAllWindows()
SimulateVideoWithTimestamps( timestamps, pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate=None, topic='src', cam_id=0)
def
stream(self):
129 def stream(self): 130 if len(self.timestamps) == 0: 131 raise StopIteration 132 133 import cv2 134 from time import time 135 cap = cv2.VideoCapture(self.cam_id) 136 cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) 137 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) 138 fps = cap.get(cv2.CAP_PROP_FPS) 139 i = 0 140 141 while cap.isOpened(): 142 ret, frame = cap.read() 143 for _ in range(int(fps * self.timestamps[i])): 144 ret, frame = cap.read() # skip frames 145 i += 1 146 if ret is True: 147 cur_time = str(time()) 148 cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame) 149 yield self.global_ftp_path + '/' + cur_time + '.jpg' 150 else: 151 break 152 cap.release() 153 cv2.destroyAllWindows()