Skip to content

Commit 7e9ba03

Browse files
committed
Add possibility to acquire bulk readings in compact JSON format
The "compact JSON" format is currently defined with timestamps as keys. Example: { "1611082554": { "temperature": 21.42, "humidity": 41.55 }, "1611082568": { "temperature": 42.84, "humidity": 83.1 } }
1 parent 7220f29 commit 7e9ba03

File tree

5 files changed

+88
-1
lines changed

5 files changed

+88
-1
lines changed

CHANGES.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ Changelog
66
incomplete
77
==========
88
- Add possibility to acquire bulk readings in JSON format
9+
- Add possibility to acquire bulk readings in compact JSON format,
10+
with timestamps as keys
911

1012

1113
in progress
@@ -50,7 +52,6 @@ in progress
5052
- QA: Improve tests on HTTP API for data acquisition
5153
- CI: Add testing against Python 3.9
5254
- CI: Run tests against different versions of Mosquitto, InfluxDB and Grafana
53-
- Add possibility to acquire bulk readings in JSON format
5455

5556

5657
.. _kotori-0.26.8:

kotori/daq/decoder/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22
# (c) 2019-2021 Andreas Motl <[email protected]>
33
from kotori.daq.decoder.airrohr import AirrohrDecoder
4+
from kotori.daq.decoder.json import CompactTimestampedJsonDecoder
45
from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder
56
from kotori.daq.decoder.schema import MessageType
67

@@ -23,6 +24,12 @@ def probe(self):
2324
if 'slot' not in self.topology:
2425
return False
2526

27+
# Compact JSON format, with timestamps as keys
28+
if self.topology.slot.endswith('tc.json'):
29+
self.info.message_type = MessageType.DATA_CONTAINER
30+
self.info.decoder = CompactTimestampedJsonDecoder
31+
return True
32+
2633
# Airrohr
2734
if self.topology.slot.endswith('airrohr.json'):
2835
self.info.message_type = MessageType.DATA_CONTAINER

kotori/daq/decoder/json.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# -*- coding: utf-8 -*-
2+
# (c) 2021 Andreas Motl <[email protected]>
3+
import json
4+
5+
6+
class CompactTimestampedJsonDecoder:
7+
"""
8+
Decode JSON payloads in compact format, with timestamps as keys.
9+
10+
Documentation
11+
=============
12+
- https://getkotori.org/docs/handbook/decoders/json.html (not yet)
13+
- https://github.com/daq-tools/kotori/issues/39
14+
15+
Example
16+
=======
17+
::
18+
19+
{
20+
"1611082554": {
21+
"temperature": 21.42,
22+
"humidity": 41.55
23+
},
24+
"1611082568": {
25+
"temperature": 42.84,
26+
"humidity": 83.1
27+
}
28+
}
29+
30+
"""
31+
32+
@staticmethod
33+
def decode(payload):
34+
35+
# Decode from JSON.
36+
message = json.loads(payload)
37+
38+
# Create list of data dictionaries.
39+
data = []
40+
for timestamp, item in message.items():
41+
item["time"] = timestamp
42+
data.append(item)
43+
44+
return data

test/settings/mqttkit.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class TestSettings:
2727
mqtt_topic3_json = 'mqttkit-1/itest3/foo/bar/data.json'
2828
mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json'
2929
mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__'
30+
mqtt_topic_json_compact = 'mqttkit-1/itest/foo/bar/tc.json'
3031
mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json'
3132

3233
# HTTP channel settings.

test/test_daq_mqtt.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,40 @@ def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb):
7171
assert record == {u'temperature': 42.84, u'humidity': 83.1}
7272

7373

74+
@pytest_twisted.inlineCallbacks
75+
@pytest.mark.mqtt
76+
def test_mqtt_to_influxdb_json_compact_bulk(machinery, create_influxdb, reset_influxdb):
77+
"""
78+
Publish multiple readings in compact JSON format to MQTT broker
79+
and proof they are stored in the InfluxDB database.
80+
81+
https://github.com/daq-tools/kotori/issues/39
82+
"""
83+
84+
# Submit multiple measurements, with timestamp.
85+
data = {
86+
"1611082554": {
87+
"temperature": 21.42,
88+
"humidity": 41.55,
89+
},
90+
"1611082568": {
91+
"temperature": 42.84,
92+
"humidity": 83.1,
93+
},
94+
}
95+
yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json_compact, data)
96+
97+
# Wait for some time to process the message.
98+
yield sleep(PROCESS_DELAY)
99+
100+
# Proof that data arrived in InfluxDB.
101+
record = influx_sensors.get_record(index=0)
102+
assert record == {u'time': '2021-01-19T18:55:54Z', u'temperature': 21.42, u'humidity': 41.55}
103+
104+
record = influx_sensors.get_record(index=1)
105+
assert record == {u'time': '2021-01-19T18:56:08Z', u'temperature': 42.84, u'humidity': 83.1}
106+
107+
74108
@pytest_twisted.inlineCallbacks
75109
@pytest.mark.mqtt
76110
@pytest.mark.legacy

0 commit comments

Comments
 (0)