Skip to content
Snippets Groups Projects
Verified Commit abe83add authored by Tamas Gal's avatar Tamas Gal :speech_balloon:
Browse files

Black

parent 4ffcd7cc
No related branches found
No related tags found
No related merge requests found
Pipeline #23707 passed
......@@ -29,12 +29,14 @@ import threading
import numpy as np
import matplotlib
matplotlib.use('Agg')
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as md
import seaborn as sns
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
import km3db
......@@ -42,16 +44,19 @@ import km3pipe as kp
from km3pipe.io.daq import TMCHData
from km3modules.ahrs import fit_ahrs, get_latest_ahrs_calibration
import km3pipe.style
km3pipe.style.use('km3pipe')
km3pipe.style.use("km3pipe")
class CalibrateAHRS(kp.Module):
def configure(self):
self.plots_path = self.require('plots_path')
det_id = self.require('det_id')
self.plots_path = self.require("plots_path")
det_id = self.require("det_id")
det_oid = km3db.tools.todetoid(det_id)
self.time_range = self.get('time_range', default=24 * 3) # hours
self.n_points_in_graph = self.get('n_points_in_graph', default=2000) # x-axis resolution (per floor)
self.time_range = self.get("time_range", default=24 * 3) # hours
self.n_points_in_graph = self.get(
"n_points_in_graph", default=2000
) # x-axis resolution (per floor)
self.data = {}
self.dus = set()
......@@ -63,13 +68,20 @@ class CalibrateAHRS(kp.Module):
self.cuckoo_stats = kp.time.Cuckoo(300, self._show_stats)
# 10Hz monitoring (including bases)
n_messages = int(10 * self.detector.n_doms * self.time_range * 60**2 + self.detector.n_dus)
n_messages = int(
10 * self.detector.n_doms * self.time_range * 60 ** 2 + self.detector.n_dus
)
self.fraction_to_keep = self.n_points_in_graph * self.detector.n_doms / n_messages
self.fraction_to_keep = (
self.n_points_in_graph * self.detector.n_doms / n_messages
)
self.cprint(f"Fraction to keep: {self.fraction_to_keep}")
self.queue_size = int(self.n_points_in_graph * 1.1) # a bit of safety margin due to randomness
self.cprint(f"Queue size for each module: {self.queue_size}, time range: {self.time_range} hours")
self.queue_size = int(
self.n_points_in_graph * 1.1
) # a bit of safety margin due to randomness
self.cprint(
f"Queue size for each module: {self.queue_size}, time range: {self.time_range} hours"
)
self.lock = threading.Lock()
self.index = 0
......@@ -80,17 +92,19 @@ class CalibrateAHRS(kp.Module):
for du, data in self.data.items():
messages.append(f"DU {du}: ")
for floor, times in data["times"].items():
messages.append(f" floor {floor}: {len(times)} ({times[0]}, {times[-1]})")
messages.append(
f" floor {floor}: {len(times)} ({times[0]}, {times[-1]})"
)
self.cprint("\n".join(messages))
def _register_du(self, du):
"""Create data cache for DU"""
self.data[du] = {}
for ahrs_param in ('yaw', 'pitch', 'roll'):
for ahrs_param in ("yaw", "pitch", "roll"):
self.data[du][ahrs_param] = defaultdict(
partial(deque, maxlen=self.queue_size))
self.data[du]['times'] = defaultdict(
partial(deque, maxlen=self.queue_size))
partial(deque, maxlen=self.queue_size)
)
self.data[du]["times"] = defaultdict(partial(deque, maxlen=self.queue_size))
self.dus.add(du)
def process(self, blob):
......@@ -101,7 +115,7 @@ class CalibrateAHRS(kp.Module):
self.cuckoo_stats()
now = datetime.utcnow()
tmch_data = TMCHData(io.BytesIO(blob['CHData']))
tmch_data = TMCHData(io.BytesIO(blob["CHData"]))
dom_id = tmch_data.dom_id
clb = self.clbmap.dom_ids[dom_id]
if clb.floor == 0:
......@@ -119,13 +133,16 @@ class CalibrateAHRS(kp.Module):
if du not in self.dus:
self._register_du(du)
cyaw, cpitch, croll = fit_ahrs(tmch_data.A, tmch_data.H, *calib)
self.cuckoo_log("DU{}-DOM{} (random pick): calibrated yaw={}".format(
clb.du, clb.floor, cyaw))
self.cuckoo_log(
"DU{}-DOM{} (random pick): calibrated yaw={}".format(
clb.du, clb.floor, cyaw
)
)
with self.lock:
self.data[du]['yaw'][clb.floor].append(cyaw)
self.data[du]['pitch'][clb.floor].append(cpitch)
self.data[du]['roll'][clb.floor].append(croll)
self.data[du]['times'][clb.floor].append(now)
self.data[du]["yaw"][clb.floor].append(cyaw)
self.data[du]["pitch"][clb.floor].append(cpitch)
self.data[du]["roll"][clb.floor].append(croll)
self.data[du]["times"][clb.floor].append(now)
self.cuckoo.msg()
return blob
......@@ -133,61 +150,70 @@ class CalibrateAHRS(kp.Module):
def create_plot(self):
self.cprint(self.__class__.__name__ + ": updating plot.")
if self.time_range > 24:
xfmt = md.DateFormatter('%Y-%m-%d %H:%M')
xfmt = md.DateFormatter("%Y-%m-%d %H:%M")
else:
xfmt = md.DateFormatter('%H:%M')
xlim = (datetime.utcfromtimestamp(time.time() -
int(self.time_range * 60 * 60)),
datetime.utcnow())
xfmt = md.DateFormatter("%H:%M")
xlim = (
datetime.utcfromtimestamp(time.time() - int(self.time_range * 60 * 60)),
datetime.utcnow(),
)
for du in self.dus:
data = self.data[du]
for ahrs_param in data.keys():
fig, ax = plt.subplots(figsize=(16, 6))
sns.set_palette("husl", 18)
ax.set_title("AHRS {} Calibration on DU{}\n{}".format(
ahrs_param, du, datetime.utcnow()))
ax.set_title(
"AHRS {} Calibration on DU{}\n{}".format(
ahrs_param, du, datetime.utcnow()
)
)
ax.set_xlabel("UTC time")
ax.xaxis.set_major_formatter(xfmt)
ax.set_ylabel(ahrs_param)
with self.lock:
for floor in sorted(data[ahrs_param].keys()):
ax.plot(data['times'][floor],
data[ahrs_param][floor],
marker='.',
linestyle='none',
label="Floor {}".format(floor))
ax.plot(
data["times"][floor],
data[ahrs_param][floor],
marker=".",
linestyle="none",
label="Floor {}".format(floor),
)
ax.set_xlim(xlim)
lgd = plt.legend(bbox_to_anchor=(1.005, 1),
loc=2,
borderaxespad=0.)
lgd = plt.legend(bbox_to_anchor=(1.005, 1), loc=2, borderaxespad=0.0)
fig.tight_layout()
plt.savefig(os.path.join(
self.plots_path,
ahrs_param + '_calib_du{}.png'.format(du)),
bbox_extra_artists=(lgd, ),
bbox_inches='tight')
plt.close('all')
plt.savefig(
os.path.join(
self.plots_path, ahrs_param + "_calib_du{}.png".format(du)
),
bbox_extra_artists=(lgd,),
bbox_inches="tight",
)
plt.close("all")
def main():
from docopt import docopt
args = docopt(__doc__)
det_id = int(args['-d'])
plots_path = args['-o']
ligier_ip = args['-l']
ligier_port = int(args['-p'])
det_id = int(args["-d"])
plots_path = args["-o"]
ligier_ip = args["-l"]
ligier_port = int(args["-p"])
pipe = kp.Pipeline()
pipe.attach(kp.io.ch.CHPump,
host=ligier_ip,
port=ligier_port,
tags='IO_MONIT',
timeout=60 * 60 * 24 * 7,
max_queue=2000)
pipe.attach(
kp.io.ch.CHPump,
host=ligier_ip,
port=ligier_port,
tags="IO_MONIT",
timeout=60 * 60 * 24 * 7,
max_queue=2000,
)
pipe.attach(CalibrateAHRS, det_id=det_id, plots_path=plots_path)
pipe.drain()
if __name__ == '__main__':
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment