From 2c5bb243bd2fb3501f120c151c31a37e79d79c96 Mon Sep 17 00:00:00 2001
From: Tamas Gal <tgal@km3net.de>
Date: Sun, 18 Mar 2018 21:55:39 +0100
Subject: [PATCH] Add trigger rates script

---
 scripts/trigger_rates.py | 183 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 183 insertions(+)
 create mode 100644 scripts/trigger_rates.py

diff --git a/scripts/trigger_rates.py b/scripts/trigger_rates.py
new file mode 100644
index 0000000..7dd13a4
--- /dev/null
+++ b/scripts/trigger_rates.py
@@ -0,0 +1,183 @@
+#!/usr/bin/env python
+# coding=utf-8
+# Filename: trigger_rates.py
+# Author: Tamas Gal <tgal@km3net.de>
+# vim: ts=4 sw=4 et
+"""
+Monitors trigger rates.
+
+Usage:
+    trigger_rates.py [options]
+    trigger_rates.py (-h | --help)
+
+Options:
+    -l LIGIER_IP    The IP of the ligier [default: 127.0.0.1].
+    -p LIGIER_PORT  The port of the ligier [default: 5553].
+    -o PLOT_DIR     The directory to save the plot [default: www/plots].
+    -h --help       Show this screen.
+
+"""
+from __future__ import division, print_function
+
+from datetime import datetime
+from collections import defaultdict, deque, OrderedDict
+import sys
+from io import BytesIO
+from os.path import join
+import shutil
+import time
+import threading
+
+import matplotlib.pyplot as plt
+import matplotlib.dates as md
+
+import km3pipe as kp
+from km3pipe.config import Config
+from km3pipe.io.daq import (DAQPreamble, DAQEvent,
+                            is_3dshower, is_3dmuon, is_mxshower)
+import km3pipe.style
+
+
+VERSION = "1.0"
+km3pipe.style.use('km3pipe')
+
+
+class TriggerRate(kp.Module):
+    def configure(self):
+        self.plots_path = self.require('plots_path')
+        self.interval = self.get("interval",
+                                 default=self.trigger_rate_sampling_period())
+        self.filename = self.get("filename", default="trigger_rates")
+        self.with_minor_ticks = self.get("with_minor_ticks", default=False)
+        print("Update interval: {}s".format(self.interval))
+        self.trigger_counts = defaultdict(int)
+        self.trigger_rates = OrderedDict()
+
+        self.styles = {
+            "xfmt": md.DateFormatter('%Y-%m-%d %H:%M'),
+            "general": dict(markersize=6,  linestyle='None'),
+            "Overall": dict(marker='D',
+                            markerfacecolor='None',
+                            markeredgecolor='tomato',
+                            markeredgewidth=1),
+            "3DMuon": dict(marker='X', markerfacecolor='dodgerblue'),
+            "MXShower": dict(marker='v', markerfacecolor='orange'),
+            "3DShower": dict(marker='^', markerfacecolor='olivedrab'),
+        }
+
+        queue_len = int(60*24/(self.interval/60))
+        for trigger in ["Overall", "3DMuon", "MXShower", "3DShower"]:
+            self.trigger_rates[trigger] = deque(maxlen=queue_len)
+
+        self.thread = threading.Thread(target=self.plot).start()
+        self.lock = threading.Lock()
+        self.run = True
+
+    def process(self, blob):
+        if not str(blob['CHPrefix'].tag) == 'IO_EVT':
+            return blob
+        sys.stdout.write('.')
+        sys.stdout.flush()
+
+        data = blob['CHData']
+        data_io = BytesIO(data)
+        preamble = DAQPreamble(file_obj=data_io)  # noqa
+        event = DAQEvent(file_obj=data_io)
+        tm = event.trigger_mask
+        with self.lock:
+            self.trigger_counts["Overall"] += 1
+            self.trigger_counts["3DShower"] += is_3dshower(tm)
+            self.trigger_counts["MXShower"] += is_mxshower(tm)
+            self.trigger_counts["3DMuon"] += is_3dmuon(tm)
+
+        print(self.trigger_counts)
+
+        return blob
+
+    def plot(self):
+        while self.run:
+            time.sleep(self.interval)
+            self.create_plot()
+
+    def create_plot(self):
+        print('\n' + self.__class__.__name__ + ": updating plot.")
+
+        timestamp = datetime.utcnow()
+
+        with self.lock:
+            for trigger, n_events in self.trigger_counts.items():
+                trigger_rate = n_events / self.interval
+                self.trigger_rates[trigger].append((timestamp, trigger_rate))
+            self.trigger_counts = defaultdict(int)
+
+        fig, ax = plt.subplots(figsize=(16, 4))
+
+        for trigger, rates in self.trigger_rates.items():
+            timestamps, trigger_rates = zip(*rates)
+            ax.plot(timestamps, trigger_rates,
+                    **self.styles[trigger],
+                    **self.styles['general'],
+                    label=trigger)
+        ax.set_title("Trigger Rates\n{0} UTC"
+                     .format(datetime.utcnow().strftime("%c")))
+        ax.set_xlabel("time")
+        ax.set_ylabel("trigger rate [Hz]")
+        ax.xaxis.set_major_formatter(self.styles["xfmt"])
+        ax.grid(True, which='minor')
+        if self.with_minor_ticks:
+            ax.minorticks_on()
+        plt.legend()
+
+        fig.tight_layout()
+
+        filename = join(self.plots_path, self.filename + '_lin.png')
+        filename_tmp = join(self.plots_path, self.filename + '_lin_tmp.png')
+        plt.savefig(filename_tmp, dpi=120, bbox_inches="tight")
+        shutil.move(filename_tmp, filename)
+
+        try:
+            ax.set_yscale('log')
+        except ValueError:
+            pass
+
+        filename = join(self.plots_path, self.filename + '.png')
+        filename_tmp = join(self.plots_path, self.filename + '_tmp.png')
+        plt.savefig(filename_tmp, dpi=120, bbox_inches="tight")
+        shutil.move(filename_tmp, filename)
+
+        plt.close('all')
+        print("Plot updated at '{}'.".format(filename))
+
+    def trigger_rate_sampling_period(self):
+        try:
+            return int(Config().get("Monitoring",
+                                    "trigger_rate_sampling_period"))
+        except (TypeError, ValueError):
+            return 180
+
+    def finish(self):
+        self.run = False
+        if self.thread is not None:
+            self.thread.stop()
+
+
+def main():
+    from docopt import docopt
+    args = docopt(__doc__, version=VERSION)
+
+    plots_path = args['-o']
+    ligier_ip = args['-l']
+    ligier_port = int(args['-p'])
+
+    pipe = kp.Pipeline()
+    pipe.attach(kp.io.ch.CHPump, host=ligier_ip,
+                port=ligier_port,
+                tags='IO_EVT',
+                timeout=60*60*24*7,
+                max_queue=200000)
+    pipe.attach(TriggerRate, interval=300, plots_path=plots_path)
+    pipe.drain()
+
+
+if __name__ == '__main__':
+    main()
-- 
GitLab