-
Tamas Gal authored
The issue where it was first mentioned is https://git.km3net.de/working_groups/daq/-/issues/147
Tamas Gal authoredThe issue where it was first mentioned is https://git.km3net.de/working_groups/daq/-/issues/147
ztplot.py 10.16 KiB
#!/usr/bin/env python
# coding=utf-8
# vim: ts=4 sw=4 et
"""
Creates z-t-plots for every DU.
Usage:
ztplot.py [options]
ztplot.py (-h | --help)
Options:
-l LIGIER_IP The IP of the ligier [default: 127.0.0.1].
-p LIGIER_PORT The port of the ligier [default: 5553].
-d DET_ID Detector ID [default: 29].
-o PLOT_DIR The directory to save the plot [default: /plots].
-h --help Show this screen.
"""
from __future__ import division
import km3pipe.style
from km3modules.plot import ztplot
from km3modules.common import LocalDBService
from km3modules.communication import ELOGService
from km3io.tools import is_3dmuon, is_3dshower, is_mxshower
import km3db
import km3pipe as kp
import numpy as np
import matplotlib.ticker as ticker
import matplotlib.pyplot as plt
from datetime import datetime
import os
import queue
import shutil
import threading
import time
from urllib.error import URLError
import matplotlib
# Force matplotlib to not use any Xwindows backend.
matplotlib.use('Agg')
km3pipe.style.use('km3pipe')
lock = threading.Lock()
class ZTPlot(kp.Module):
def configure(self):
self.plots_path = self.require('plots_path')
self.ytick_distance = self.get('ytick_distance', default=200)
self.min_dus = self.get('min_dus', default=1)
self.min_doms = self.get('min_doms', default=4)
self.det_id = self.require('det_id')
self.event_selection_table = self.get('event_selection_table',
default='event_selection')
self.logbook = self.get('logbook', default="Individual+Logbooks")
self.run_id = None
self.t0set = None
self.calib = None
self.max_z = None
self.last_plot_time = 0
self.lower_limits = {}
self.elog = self.get('elog', default=False)
self.sds = km3db.StreamDS()
self.index = 0
def prepare(self):
if not self.services["table_exists"](self.event_selection_table):
self.services["create_table"](self.event_selection_table, [
"overlays", "n_hits", "n_triggered_hits", "n_dus",
"plot_filename", "run_id", "det_id", "frame_index",
"trigger_counter", "utc_timestamp"
], [
"INT", "INT", "INT", "INT", "TEXT", "INT", "INT", "INT", "INT",
"INT"
])
self._update_lower_limits()
self.run = True
self.max_queue = 300
self.queue = queue.Queue()
self.thread = threading.Thread(target=self.plot, daemon=True)
self.thread.start()
def _update_lower_limits(self):
"""Update the lower limits for the Top-10 candidate selection"""
n_candidates = 10
for category in ["overlays", "n_hits", "n_triggered_hits"]:
lower_limits = self.services["query"](
"SELECT {cat} FROM {tab} ORDER BY {cat} DESC LIMIT {limit}".
format(cat=category,
tab=self.event_selection_table,
limit=n_candidates))
if len(lower_limits) < 10:
lower_limit = 0
else:
lower_limit = lower_limits[-1][0]
self.lower_limits[category] = lower_limit
self.cprint("Current limits for the Top-10: {}".format(
self.lower_limits))
def _update_calibration(self):
self.cprint("Updating calibration for run {}".format(self.run_id))
try:
self.calib = kp.calib.Calibration(det_id=self.det_id,
run=self.run_id)
except IndexError:
self.log.error("Unusable (probably empty) DETX received from the database, "
"falling back to the base DETX without any calibration and "
"retrying at run change.")
self.calib = kp.calib.Calibration(det_id=self.det_id)
except URLError as e:
self.log.error(
"Unable to update calibration, no connection to the DB, "
"retrying in one minute...\n{}".
format(e))
time.sleep(60)
self._update_calibration()
else:
self.max_z = round(np.max(self.calib.detector.pmts.pos_z) + 10, -1)
def process(self, blob):
if 'Hits' not in blob:
return blob
self.index += 1
event_info = blob['EventInfo']
run_id = event_info.run_id[0]
if run_id != self.run_id:
self.run_id = run_id
self._update_calibration()
hits = blob['Hits']
hits = self.calib.apply(hits)
n_triggered_dus = len(np.unique(hits[hits.triggered > 0].du))
n_triggered_doms = len(np.unique(hits[hits.triggered > 0].dom_id))
if n_triggered_dus < self.min_dus or n_triggered_doms < self.min_doms:
self.log.debug(f"Skipping event with {n_triggered_dus} DUs "
f"and {n_triggered_doms} DOMs.")
return blob
# print("Event queue size: {0}".format(self.queue.qsize()))
if self.queue.qsize() < self.max_queue:
raw_data = blob["CHData"]
self.queue.put((event_info, hits, raw_data))
else:
self.cprint("Skipping, queue is full...")
return blob
def plot(self):
while self.run:
try:
event_info, hits, raw_data = self.queue.get(timeout=50)
except queue.Empty:
continue
with lock:
self.create_plot(event_info, hits, raw_data)
def create_plot(self, event_info, hits, raw_data):
trigger_mask = event_info.trigger_mask[0]
det_id = event_info.det_id[0]
run_id = event_info.run_id[0]
frame_index = event_info.frame_index[0]
trigger_counter = event_info.trigger_counter[0]
utc_timestamp = event_info.utc_seconds[0]
overlays = event_info.overlays[0]
n_hits = len(hits)
n_triggered_hits = sum(hits.triggered)
# Check for new record
is_in_top10 = overlays > self.lower_limits[
'overlays'] or n_hits > self.lower_limits[
'n_hits'] or n_triggered_hits > self.lower_limits[
"n_triggered_hits"]
if (utc_timestamp - self.last_plot_time) < 60 and not is_in_top10:
self.log.debug("Skipping plot...")
return
self.cprint(self.__class__.__name__ + ": updating plot.")
dus = set(hits.du)
n_dus = len(dus)
grid_lines = self.calib.detector.pmts.pos_z[
(self.calib.detector.pmts.du == min(dus))
& (self.calib.detector.pmts.channel_id == 0)]
trigger_params = ' '.join([
trig
for trig, trig_check in (("MX", is_mxshower), ("3DM", is_3dmuon),
("3DS", is_3dshower))
if trig_check(int(trigger_mask))
])
try:
dt = datetime.utcfromtimestamp(utc_timestamp)
except OverflowError:
self.log.critical("OverflowError encountered when converting '%s' to "
"a datetime object." % utc_timestamp)
dt = "invalid date"
title = "z-t-Plot for DetID-{0} Run {1}, " \
"FrameIndex {2}, TriggerCounter {3}, Overlays {4}, " \
"Trigger: {5}\n{6} UTC".format(
det_id, run_id, frame_index, trigger_counter,
overlays, trigger_params, dt)
filename = 'ztplot'
f = os.path.join(self.plots_path, filename + '.png')
f_tmp = os.path.join(self.plots_path, filename + '_tmp.png')
fig = ztplot(hits,
filename=f_tmp,
title=title,
max_z=self.max_z,
ytick_distance=self.ytick_distance,
grid_lines=grid_lines,
n_dus=n_dus,
figsize=(16, 16))
shutil.move(f_tmp, f)
if is_in_top10:
self.cprint(
"New record! Overlays: {}, hits: {}, triggered hits: {}".
format(overlays, n_hits, n_triggered_hits))
base_filename = os.path.join(
self.plots_path,
"event_selection/ztplot_{:08d}_{:08d}_FI{}_TC{}".format(
det_id, run_id, frame_index, trigger_counter))
plot_filename = base_filename + ".png"
rawdata_filename = base_filename + ".dat"
self.services["insert_row"](self.event_selection_table, [
"overlays", "n_hits", "n_triggered_hits", "n_dus",
"plot_filename", "run_id", "det_id", "frame_index",
"trigger_counter", "utc_timestamp"
], [
overlays, n_hits, n_triggered_hits, n_dus, plot_filename,
run_id, det_id, frame_index, trigger_counter, utc_timestamp
])
shutil.copy(f, plot_filename)
with open(rawdata_filename, "wb") as fobj:
fobj.write(raw_data)
self._update_lower_limits()
if self.elog:
self.services['post_elog'](
logbook=self.logbook,
subject="New massive event!",
message="A new event has made it into the top 10!",
message_type="Monitoring",
author="Gal T",
files=[plot_filename])
plt.close(fig)
plt.close('all')
self.last_plot_time = utc_timestamp
def finish(self):
self.run = False
def main():
from docopt import docopt
args = docopt(__doc__)
det_id = int(args['-d'])
plots_path = args['-o']
ligier_ip = args['-l']
ligier_port = int(args['-p'])
pipe = kp.Pipeline()
pipe.attach(LocalDBService, thread_safety=False)
pipe.attach(ELOGService)
pipe.attach(kp.io.ch.CHPump,
host=ligier_ip,
port=ligier_port,
tags='IO_EVT, IO_SUM',
timeout=60 * 60 * 24 * 7,
max_queue=2000)
pipe.attach(kp.io.daq.DAQProcessor)
pipe.attach(ZTPlot, det_id=det_id, plots_path=plots_path, elog=False)
pipe.drain()
if __name__ == '__main__':
main()