edgeserve.data_source

  1import pulsar
  2from pulsar.schema import AvroSchema
  3import time
  4import os
  5import pickle
  6import uuid
  7
  8from edgeserve.message_format import GraphCodec
  9
 10
 11class DataSource:
 12    def __init__(self, stream, pulsar_node, source_id, gate=None, topic='src', log_path=None, log_filename=None):
 13        self.client = pulsar.Client(pulsar_node)
 14        self.producer = self.client.create_producer(topic, schema=pulsar.schema.BytesSchema())
 15        self.stream = iter(stream)
 16        self.gate = (lambda x: x) if gate is None else gate
 17        assert len(source_id) <= 16, 'source_id must be at most 16 bytes long'
 18        self.source_id = source_id
 19        self.log_path = log_path
 20        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
 21        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
 22
 23    def __enter__(self):
 24        return self
 25
 26    def __exit__(self, exc_type, exc_val, exc_tb):
 27        self.client.close()
 28
 29    def __iter__(self):
 30        return self
 31
 32    def __next__(self):
 33        incoming = next(self.stream)
 34        if incoming is None:
 35            return None
 36        data = self.gate(incoming)
 37        if data is None:
 38            return None
 39        data_collection_time_ms = time.time() * 1000
 40
 41        msg_uuid = uuid.uuid4()
 42        message = self.graph_codec.encode(msg_uuid=msg_uuid, op_from=self.source_id, payload=data)
 43        self.producer.send(message)
 44        msg_sent_time_ms = time.time() * 1000
 45
 46        # If log_path is not None, we write timestamps to a log file.
 47        if self.log_path and os.path.isdir(self.log_path):
 48            replay_log = {'msg_uuid': msg_uuid,
 49                          'data_collection_time_ms': data_collection_time_ms,
 50                          'msg_sent_time_ms': msg_sent_time_ms}
 51            with open(os.path.join(self.log_path, self.log_filename + '.datasource'), 'ab') as f:
 52                pickle.dump(replay_log, f)
 53
 54        return data
 55
 56
 57class CodeSource(DataSource):
 58    def __init__(self, stream, pulsar_node, gate=lambda x: x.encode('utf-8'), topic='src'):
 59        super().__init__(stream, pulsar_node, 'code-source', gate, topic)
 60
 61
 62class CameraSource(DataSource):
 63    # local_ftp_path: str, e.g. '/srv/ftp/files/'
 64    # remote_ftp_path: str, e.g. 'ftp://192.168.1.101/files/'
 65    def __init__(self, pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id,
 66                 gate=None, topic='src', cam_id=0):
 67        super().__init__(self.stream(), pulsar_node, source_id, gate, topic)
 68        self.cam_id = cam_id
 69        self.local_ftp_path = local_ftp_path
 70        self.global_ftp_path = global_ftp_path
 71        self.width = width
 72        self.height = height
 73
 74    def stream(self):
 75        import cv2
 76        from time import time
 77        cap = cv2.VideoCapture(self.cam_id)
 78        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
 79        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
 80        while cap.isOpened():
 81            ret, frame = cap.read()
 82            if ret is True:
 83                cur_time = str(time())
 84                cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame)
 85                yield self.global_ftp_path + '/' + cur_time + '.jpg'
 86            else:
 87                break
 88        cap.release()
 89        cv2.destroyAllWindows()
 90
 91
 92# read in a csv file with first column as timestamp in seconds.
 93class SimulateTimeSeries(DataSource):
 94    def __init__(self, filename, pulsar_node, source_id, gate=None, topic='src'):
 95        super().__init__(self.stream(), pulsar_node, source_id, gate, topic)
 96        self.timestamps = []
 97        self.values = []
 98        self._csv_to_list(filename)
 99
100    def _csv_to_list(self, filename):
101        import csv
102        with open(filename, 'r') as f:
103            reader = csv.reader(f)
104            for row in reader:
105                self.timestamps.append(row[0])
106                self.values.append(row[1:])
107
108    def stream(self):
109        if len(self.timestamps) == 0:
110            raise StopIteration
111
112        import time
113        time_diff = time.time() - float(self.timestamps[0])
114        for i in range(len(self.timestamps)):
115            this_time_diff = time.time() - float(self.timestamps[i])
116            if this_time_diff < time_diff:
117                time.sleep(time_diff - this_time_diff)
118            time_diff = this_time_diff
119            yield self.values[i]
120
121
122class SimulateVideoWithTimestamps(CameraSource):
123    def __init__(self, timestamps, pulsar_node, local_ftp_path, global_ftp_path, width, height,
124                 source_id, gate=None, topic='src', cam_id=0):
125        super().__init__(pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate, topic, cam_id)
126        self.timestamps = timestamps
127
128    def stream(self):
129        if len(self.timestamps) == 0:
130            raise StopIteration
131
132        import cv2
133        from time import time
134        cap = cv2.VideoCapture(self.cam_id)
135        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
136        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
137        fps = cap.get(cv2.CAP_PROP_FPS)
138        i = 0
139
140        while cap.isOpened():
141            ret, frame = cap.read()
142            for _ in range(int(fps * self.timestamps[i])):
143                ret, frame = cap.read()  # skip frames
144            i += 1
145            if ret is True:
146                cur_time = str(time())
147                cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame)
148                yield self.global_ftp_path + '/' + cur_time + '.jpg'
149            else:
150                break
151        cap.release()
152        cv2.destroyAllWindows()
class DataSource:
12class DataSource:
13    def __init__(self, stream, pulsar_node, source_id, gate=None, topic='src', log_path=None, log_filename=None):
14        self.client = pulsar.Client(pulsar_node)
15        self.producer = self.client.create_producer(topic, schema=pulsar.schema.BytesSchema())
16        self.stream = iter(stream)
17        self.gate = (lambda x: x) if gate is None else gate
18        assert len(source_id) <= 16, 'source_id must be at most 16 bytes long'
19        self.source_id = source_id
20        self.log_path = log_path
21        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
22        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
23
24    def __enter__(self):
25        return self
26
27    def __exit__(self, exc_type, exc_val, exc_tb):
28        self.client.close()
29
30    def __iter__(self):
31        return self
32
33    def __next__(self):
34        incoming = next(self.stream)
35        if incoming is None:
36            return None
37        data = self.gate(incoming)
38        if data is None:
39            return None
40        data_collection_time_ms = time.time() * 1000
41
42        msg_uuid = uuid.uuid4()
43        message = self.graph_codec.encode(msg_uuid=msg_uuid, op_from=self.source_id, payload=data)
44        self.producer.send(message)
45        msg_sent_time_ms = time.time() * 1000
46
47        # If log_path is not None, we write timestamps to a log file.
48        if self.log_path and os.path.isdir(self.log_path):
49            replay_log = {'msg_uuid': msg_uuid,
50                          'data_collection_time_ms': data_collection_time_ms,
51                          'msg_sent_time_ms': msg_sent_time_ms}
52            with open(os.path.join(self.log_path, self.log_filename + '.datasource'), 'ab') as f:
53                pickle.dump(replay_log, f)
54
55        return data
DataSource( stream, pulsar_node, source_id, gate=None, topic='src', log_path=None, log_filename=None)
13    def __init__(self, stream, pulsar_node, source_id, gate=None, topic='src', log_path=None, log_filename=None):
14        self.client = pulsar.Client(pulsar_node)
15        self.producer = self.client.create_producer(topic, schema=pulsar.schema.BytesSchema())
16        self.stream = iter(stream)
17        self.gate = (lambda x: x) if gate is None else gate
18        assert len(source_id) <= 16, 'source_id must be at most 16 bytes long'
19        self.source_id = source_id
20        self.log_path = log_path
21        self.log_filename = str(time.time() * 1000) if log_filename is None else log_filename
22        self.graph_codec = GraphCodec(msg_uuid_size=16, op_from_size=16, header_size=0)
client
producer
stream
gate
source_id
log_path
log_filename
graph_codec
class CodeSource(DataSource):
58class CodeSource(DataSource):
59    def __init__(self, stream, pulsar_node, gate=lambda x: x.encode('utf-8'), topic='src'):
60        super().__init__(stream, pulsar_node, 'code-source', gate, topic)
CodeSource( stream, pulsar_node, gate=<function CodeSource.<lambda>>, topic='src')
59    def __init__(self, stream, pulsar_node, gate=lambda x: x.encode('utf-8'), topic='src'):
60        super().__init__(stream, pulsar_node, 'code-source', gate, topic)
class CameraSource(DataSource):
63class CameraSource(DataSource):
64    # local_ftp_path: str, e.g. '/srv/ftp/files/'
65    # remote_ftp_path: str, e.g. 'ftp://192.168.1.101/files/'
66    def __init__(self, pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id,
67                 gate=None, topic='src', cam_id=0):
68        super().__init__(self.stream(), pulsar_node, source_id, gate, topic)
69        self.cam_id = cam_id
70        self.local_ftp_path = local_ftp_path
71        self.global_ftp_path = global_ftp_path
72        self.width = width
73        self.height = height
74
75    def stream(self):
76        import cv2
77        from time import time
78        cap = cv2.VideoCapture(self.cam_id)
79        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
80        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
81        while cap.isOpened():
82            ret, frame = cap.read()
83            if ret is True:
84                cur_time = str(time())
85                cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame)
86                yield self.global_ftp_path + '/' + cur_time + '.jpg'
87            else:
88                break
89        cap.release()
90        cv2.destroyAllWindows()
CameraSource( pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate=None, topic='src', cam_id=0)
66    def __init__(self, pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id,
67                 gate=None, topic='src', cam_id=0):
68        super().__init__(self.stream(), pulsar_node, source_id, gate, topic)
69        self.cam_id = cam_id
70        self.local_ftp_path = local_ftp_path
71        self.global_ftp_path = global_ftp_path
72        self.width = width
73        self.height = height
cam_id
local_ftp_path
global_ftp_path
width
height
def stream(self):
75    def stream(self):
76        import cv2
77        from time import time
78        cap = cv2.VideoCapture(self.cam_id)
79        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
80        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
81        while cap.isOpened():
82            ret, frame = cap.read()
83            if ret is True:
84                cur_time = str(time())
85                cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame)
86                yield self.global_ftp_path + '/' + cur_time + '.jpg'
87            else:
88                break
89        cap.release()
90        cv2.destroyAllWindows()
class SimulateTimeSeries(DataSource):
 94class SimulateTimeSeries(DataSource):
 95    def __init__(self, filename, pulsar_node, source_id, gate=None, topic='src'):
 96        super().__init__(self.stream(), pulsar_node, source_id, gate, topic)
 97        self.timestamps = []
 98        self.values = []
 99        self._csv_to_list(filename)
100
101    def _csv_to_list(self, filename):
102        import csv
103        with open(filename, 'r') as f:
104            reader = csv.reader(f)
105            for row in reader:
106                self.timestamps.append(row[0])
107                self.values.append(row[1:])
108
109    def stream(self):
110        if len(self.timestamps) == 0:
111            raise StopIteration
112
113        import time
114        time_diff = time.time() - float(self.timestamps[0])
115        for i in range(len(self.timestamps)):
116            this_time_diff = time.time() - float(self.timestamps[i])
117            if this_time_diff < time_diff:
118                time.sleep(time_diff - this_time_diff)
119            time_diff = this_time_diff
120            yield self.values[i]
SimulateTimeSeries(filename, pulsar_node, source_id, gate=None, topic='src')
95    def __init__(self, filename, pulsar_node, source_id, gate=None, topic='src'):
96        super().__init__(self.stream(), pulsar_node, source_id, gate, topic)
97        self.timestamps = []
98        self.values = []
99        self._csv_to_list(filename)
timestamps
values
def stream(self):
109    def stream(self):
110        if len(self.timestamps) == 0:
111            raise StopIteration
112
113        import time
114        time_diff = time.time() - float(self.timestamps[0])
115        for i in range(len(self.timestamps)):
116            this_time_diff = time.time() - float(self.timestamps[i])
117            if this_time_diff < time_diff:
118                time.sleep(time_diff - this_time_diff)
119            time_diff = this_time_diff
120            yield self.values[i]
class SimulateVideoWithTimestamps(CameraSource):
123class SimulateVideoWithTimestamps(CameraSource):
124    def __init__(self, timestamps, pulsar_node, local_ftp_path, global_ftp_path, width, height,
125                 source_id, gate=None, topic='src', cam_id=0):
126        super().__init__(pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate, topic, cam_id)
127        self.timestamps = timestamps
128
129    def stream(self):
130        if len(self.timestamps) == 0:
131            raise StopIteration
132
133        import cv2
134        from time import time
135        cap = cv2.VideoCapture(self.cam_id)
136        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
137        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
138        fps = cap.get(cv2.CAP_PROP_FPS)
139        i = 0
140
141        while cap.isOpened():
142            ret, frame = cap.read()
143            for _ in range(int(fps * self.timestamps[i])):
144                ret, frame = cap.read()  # skip frames
145            i += 1
146            if ret is True:
147                cur_time = str(time())
148                cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame)
149                yield self.global_ftp_path + '/' + cur_time + '.jpg'
150            else:
151                break
152        cap.release()
153        cv2.destroyAllWindows()
SimulateVideoWithTimestamps( timestamps, pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate=None, topic='src', cam_id=0)
124    def __init__(self, timestamps, pulsar_node, local_ftp_path, global_ftp_path, width, height,
125                 source_id, gate=None, topic='src', cam_id=0):
126        super().__init__(pulsar_node, local_ftp_path, global_ftp_path, width, height, source_id, gate, topic, cam_id)
127        self.timestamps = timestamps
timestamps
def stream(self):
129    def stream(self):
130        if len(self.timestamps) == 0:
131            raise StopIteration
132
133        import cv2
134        from time import time
135        cap = cv2.VideoCapture(self.cam_id)
136        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
137        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
138        fps = cap.get(cv2.CAP_PROP_FPS)
139        i = 0
140
141        while cap.isOpened():
142            ret, frame = cap.read()
143            for _ in range(int(fps * self.timestamps[i])):
144                ret, frame = cap.read()  # skip frames
145            i += 1
146            if ret is True:
147                cur_time = str(time())
148                cv2.imwrite(self.local_ftp_path + '/' + cur_time + '.jpg', frame)
149                yield self.global_ftp_path + '/' + cur_time + '.jpg'
150            else:
151                break
152        cap.release()
153        cv2.destroyAllWindows()