Skip to content

Commit 16556ff

Browse files
committed
Add possibility to acquire bulk readings in JSON format
1 parent 7a4dc1a commit 16556ff

File tree

5 files changed

+112
-1
lines changed

5 files changed

+112
-1
lines changed

CHANGES.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ Changelog
33
*********
44

55

6+
incomplete
7+
==========
8+
- Add possibility to acquire bulk readings in JSON format
9+
10+
611
in progress
712
===========
813
- Add basic information about RabbitMQ

kotori/daq/graphing/grafana/dashboard.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,19 @@ def collect_fields(data, prefixes=None, sorted=True):
275275
# time is from intercom.mqtt
276276
blacklist = ['_hex_', 'time']
277277

278+
# Compute list of unique attribute names.
279+
if isinstance(data, dict):
280+
keys = data.keys()
281+
elif isinstance(data, list):
282+
keys = set()
283+
for item in data:
284+
for key in item.keys():
285+
keys.add(key)
286+
else:
287+
raise ValueError(f"Type of data {type(data)} not accepted")
288+
278289
fields = []
279-
for field in data.keys():
290+
for field in keys:
280291
if field in blacklist:
281292
continue
282293

kotori/daq/storage/influx.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ def is_udp_database(self, name):
6464
return False
6565

6666
def write(self, meta, data):
67+
if isinstance(data, dict):
68+
self.write_single(meta, data)
69+
elif isinstance(data, list):
70+
for item in data:
71+
self.write_single(meta, item)
72+
else:
73+
raise ValueError(f"Type of data {type(data)} not accepted")
74+
75+
def write_single(self, meta, data):
6776

6877
meta_copy = deepcopy(dict(meta))
6978
data_copy = deepcopy(data)

test/test_daq_grafana.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,55 @@ def test_mqtt_to_grafana_two_dashboards(machinery, create_influxdb, reset_influx
171171
titles = grafana.get_dashboard_titles()
172172
assert settings.grafana_dashboards[0] in titles
173173
assert settings.grafana_dashboards[1] in titles
174+
175+
176+
@pytest_twisted.inlineCallbacks
177+
@pytest.mark.grafana
178+
def test_mqtt_to_grafana_bulk(machinery, create_influxdb, reset_influxdb, reset_grafana):
179+
"""
180+
Publish multiple readings in JSON format to MQTT broker and proof
181+
that a corresponding datasource and a dashboard was created in Grafana.
182+
"""
183+
184+
# Submit multiple measurements, without timestamp.
185+
data = [
186+
{
187+
'temperature': 21.42,
188+
'humidity': 41.55,
189+
},
190+
{
191+
'temperature': 42.84,
192+
'humidity': 83.1,
193+
'voltage': 4.2,
194+
},
195+
{
196+
'weight': 10.10,
197+
},
198+
]
199+
yield mqtt_json_sensor(settings.mqtt_topic_json, data)
200+
201+
# Wait for some time to process the message.
202+
yield sleep(PROCESS_DELAY_MQTT)
203+
yield sleep(PROCESS_DELAY_MQTT)
204+
yield sleep(PROCESS_DELAY_MQTT)
205+
206+
# Proof that Grafana is well provisioned.
207+
logger.info('Grafana: Checking datasource')
208+
datasource_names = []
209+
for datasource in grafana.client.datasources.get():
210+
datasource_names.append(datasource['name'])
211+
assert settings.influx_database in datasource_names
212+
213+
logger.info('Grafana: Checking dashboard')
214+
dashboard_name = settings.grafana_dashboards[0]
215+
dashboard = grafana.get_dashboard_by_name(dashboard_name)
216+
targets = dashboard['rows'][0]['panels'][0]['targets']
217+
218+
# Validate table name.
219+
assert targets[0]['measurement'] == settings.influx_measurement_sensors
220+
221+
# Validate field names.
222+
fields = set()
223+
for target in targets:
224+
fields.add(target["fields"][0]["name"])
225+
assert fields == set(["temperature", "humidity", "weight", "voltage"])

test/test_daq_mqtt.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,40 @@ def test_mqtt_to_influxdb_json_single(machinery, create_influxdb, reset_influxdb
3737
yield record
3838

3939

40+
@pytest_twisted.inlineCallbacks
41+
@pytest.mark.mqtt
42+
def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb):
43+
"""
44+
Publish multiple readings in JSON format to MQTT broker
45+
and proof it is stored in the InfluxDB database.
46+
"""
47+
48+
# Submit multiple measurements, without timestamp.
49+
data = [
50+
{
51+
'temperature': 21.42,
52+
'humidity': 41.55,
53+
},
54+
{
55+
'temperature': 42.84,
56+
'humidity': 83.1,
57+
},
58+
]
59+
yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json, data)
60+
61+
# Wait for some time to process the message.
62+
yield sleep(PROCESS_DELAY_MQTT)
63+
64+
# Proof that data arrived in InfluxDB.
65+
record = influx_sensors.get_record(index=0)
66+
del record['time']
67+
assert record == {u'temperature': 21.42, u'humidity': 41.55}
68+
69+
record = influx_sensors.get_record(index=1)
70+
del record['time']
71+
assert record == {u'temperature': 42.84, u'humidity': 83.1}
72+
73+
4074
@pytest_twisted.inlineCallbacks
4175
@pytest.mark.mqtt
4276
@pytest.mark.legacy

0 commit comments

Comments
 (0)