Skip to content
Snippets Groups Projects
trigger_rates.py 9.15 KiB
Newer Older
Tamas Gal's avatar
Tamas Gal committed
#!/usr/bin/env python
# coding=utf-8
# Filename: trigger_rates.py
# Author: Tamas Gal <tgal@km3net.de>
# vim: ts=4 sw=4 et
"""
Monitors trigger rates.

Usage:
    trigger_rates.py [options]
    trigger_rates.py (-h | --help)

Options:
    -l LIGIER_IP    The IP of the ligier [default: 127.0.0.1].
    -p LIGIER_PORT  The port of the ligier [default: 5553].
Tamas Gal's avatar
Tamas Gal committed
    -o PLOT_DIR     The directory to save the plot [default: plots].
Tamas Gal's avatar
Tamas Gal committed
    -h --help       Show this screen.

"""
from __future__ import division, print_function

from datetime import datetime
from collections import defaultdict, deque, OrderedDict
Tamas Gal's avatar
Tamas Gal committed
from itertools import chain
Tamas Gal's avatar
Tamas Gal committed
import sys
from io import BytesIO
Tamas Gal's avatar
Tamas Gal committed
from os.path import join, exists
Tamas Gal's avatar
Tamas Gal committed
import shutil
import time
import threading

Tamas Gal's avatar
Tamas Gal committed
import matplotlib
matplotlib.use("Agg")
Tamas Gal's avatar
Tamas Gal committed
import matplotlib.pyplot as plt
import matplotlib.dates as md

import km3pipe as kp
from km3pipe.config import Config
Tamas Gal's avatar
Tamas Gal committed
from km3pipe.io.daq import (DAQPreamble, DAQEvent, is_3dshower, is_3dmuon,
                            is_mxshower)
Tamas Gal's avatar
Tamas Gal committed
import km3pipe.style

VERSION = "1.0"
km3pipe.style.use('km3pipe')


class TriggerRate(kp.Module):
    def configure(self):
        self.plots_path = self.require('plots_path')
Tamas Gal's avatar
Tamas Gal committed
        self.data_path = self.get('data_path', default='data')
        self.interval = self.get("interval",
                                 default=self.trigger_rate_sampling_period())
Tamas Gal's avatar
Tamas Gal committed
        self.filename = self.get("filename", default="trigger_rates")
        self.with_minor_ticks = self.get("with_minor_ticks", default=False)
Tamas Gal's avatar
Tamas Gal committed
        print("Update interval: {}s".format(self.interval))
        self.trigger_counts = defaultdict(int)
        self.trigger_rates = OrderedDict()
Tamas Gal's avatar
Tamas Gal committed
        self._trigger_types = ["Overall", "3DMuon", "MXShower", "3DShower"]
        self.trigger_rates_fobj = None

        self.initialise_data_logging()
Tamas Gal's avatar
Tamas Gal committed

        self.styles = {
            "xfmt": md.DateFormatter('%Y-%m-%d %H:%M'),
            "general": dict(markersize=6, linestyle=':', linewidth=1),
            "Overall": dict(marker='D', color='tomato', markeredgewidth=1),
            "3DMuon": dict(marker='X', color='dodgerblue'),
            "MXShower": dict(marker='v', color='orange'),
            "3DShower": dict(marker='^', color='olivedrab'),
Tamas Gal's avatar
Tamas Gal committed
        queue_len = int(60 * 24 / (self.interval / 60))
Tamas Gal's avatar
Tamas Gal committed
        for trigger in self._trigger_types:
Tamas Gal's avatar
Tamas Gal committed
            self.trigger_rates[trigger] = deque(maxlen=queue_len)

Tamas Gal's avatar
Tamas Gal committed
        self.run = True
Tamas Gal's avatar
Tamas Gal committed
        threading.Thread(target=self.plot).start()
Tamas Gal's avatar
Tamas Gal committed
        self.lock = threading.Lock()

Tamas Gal's avatar
Tamas Gal committed
        self.run_changes = []
        self.current_run_id = 0
Tamas Gal's avatar
Tamas Gal committed
        self.det_id = 0
Tamas Gal's avatar
Tamas Gal committed
    def initialise_data_logging(self):
        filename = join(self.data_path, "trigger_rates.csv")
        if not exists(filename):
            self.trigger_rates_fobj = open(filename, "w")
            self.trigger_rates_fobj.write(','.join(self._trigger_types) + '\n')
        else:
            self.trigger_rates_fobj = open(filename, "a")
        self.trigger_rates_fobj.flush()
Tamas Gal's avatar
Tamas Gal committed
    def process(self, blob):
        if not str(blob['CHPrefix'].tag) == 'IO_EVT':
            return blob
        sys.stdout.write('.')
        sys.stdout.flush()

        data = blob['CHData']
        data_io = BytesIO(data)
        preamble = DAQPreamble(file_obj=data_io)  # noqa
        event = DAQEvent(file_obj=data_io)
Tamas Gal's avatar
Tamas Gal committed
        self.det_id = event.header.det_id
Tamas Gal's avatar
Tamas Gal committed
        if event.header.run > self.current_run_id:
            self.current_run_id = event.header.run
            self._log_run_change()
Tamas Gal's avatar
Tamas Gal committed
        tm = event.trigger_mask
        with self.lock:
            self.trigger_counts["Overall"] += 1
            self.trigger_counts["3DShower"] += is_3dshower(tm)
            self.trigger_counts["MXShower"] += is_mxshower(tm)
            self.trigger_counts["3DMuon"] += is_3dmuon(tm)

        print(self.trigger_counts)

        return blob

Tamas Gal's avatar
Tamas Gal committed
    def _log_run_change(self):
Tamas Gal's avatar
Tamas Gal committed
        self.print("New run: %s" % self.current_run_id)
Tamas Gal's avatar
Tamas Gal committed
        now = datetime.utcnow()
        self.run_changes.append((now, self.current_run_id))

Tamas Gal's avatar
Tamas Gal committed
    def _get_run_changes_to_plot(self):
        self.print("Checking run changes out of range")
        overall_rates = self.trigger_rates['Overall']
        if not overall_rates:
            self.print("No trigger rates logged  yet, nothing to remove.")
            return
Tamas Gal's avatar
Tamas Gal committed
        self.print("  all:     {}".format(self.run_changes))
        run_changes_to_plot = []
        min_timestamp = min(overall_rates)[0]
Tamas Gal's avatar
Tamas Gal committed
        self.print("  earliest timestamp to plot: {}".format(min_timestamp))
Tamas Gal's avatar
Tamas Gal committed
        for timestamp, run in self.run_changes:
            if timestamp > min_timestamp:
Tamas Gal's avatar
Tamas Gal committed
                run_changes_to_plot.append((timestamp, run))
        self.print("  to plot: {}".format(run_changes_to_plot))
        return run_changes_to_plot
Tamas Gal's avatar
Tamas Gal committed
    def plot(self):
        while self.run:
            time.sleep(self.interval)
Tamas Gal's avatar
Tamas Gal committed
            timestamp, trigger_rates = self.calculate_trigger_rates()
            self.write_trigger_rates(timestamp, trigger_rates)
Tamas Gal's avatar
Tamas Gal committed
            self.create_plot()

Tamas Gal's avatar
Tamas Gal committed
    def write_trigger_rates(self, timestamp, trigger_rates):
        entry = f"{timestamp}"
        for trigger_type in self._trigger_types:
Tamas Gal's avatar
Tamas Gal committed
            try:
                trigger_rate = trigger_rates[trigger_type]
            except KeyError:
                trigger_rate = 0
            entry += f",{trigger_rate}"
Tamas Gal's avatar
Tamas Gal committed
        entry += '\n'
        self.trigger_rates_fobj.write(entry)
        self.trigger_rates_fobj.flush()
Tamas Gal's avatar
Tamas Gal committed
    def calculate_trigger_rates(self):
Tamas Gal's avatar
Tamas Gal committed
        timestamp = datetime.utcnow()
Tamas Gal's avatar
Tamas Gal committed
        trigger_rates = {}
Tamas Gal's avatar
Tamas Gal committed
        trigger_rates['Overall'] = 0
Tamas Gal's avatar
Tamas Gal committed
        with self.lock:
            for trigger, n_events in self.trigger_counts.items():
                trigger_rate = n_events / self.interval
                self.trigger_rates[trigger].append((timestamp, trigger_rate))
Tamas Gal's avatar
Tamas Gal committed
                trigger_rates[trigger] = trigger_rate
Tamas Gal's avatar
Tamas Gal committed
                trigger_rates['Overall'] += trigger_rate
Tamas Gal's avatar
Tamas Gal committed
            self.trigger_counts = defaultdict(int)
Tamas Gal's avatar
Tamas Gal committed
        return timestamp.timestamp(), trigger_rates
Tamas Gal's avatar
Tamas Gal committed

    def create_plot(self):
        print('\n' + self.__class__.__name__ + ": updating plot.")
Tamas Gal's avatar
Tamas Gal committed

        fig, ax = plt.subplots(figsize=(16, 4))

        for trigger, rates in self.trigger_rates.items():
            if not rates:
                self.log.warning("Empty rates, skipping...")
                continue
Tamas Gal's avatar
Tamas Gal committed
            timestamps, trigger_rates = zip(*rates)
            ax.plot(timestamps,
                    trigger_rates,
                    **self.styles[trigger],
                    **self.styles['general'],
                    label=trigger)
Tamas Gal's avatar
Tamas Gal committed
        run_changes_to_plot = self._get_run_changes_to_plot()
        self.print("Recorded run changes: {}".format(run_changes_to_plot))
Tamas Gal's avatar
Tamas Gal committed
        all_rates = [r for d, r in chain(*self.trigger_rates.values())]
Tamas Gal's avatar
Tamas Gal committed
        if not all_rates:
Tamas Gal's avatar
Tamas Gal committed
            self.log.warning("Empty rates, skipping...")
Tamas Gal's avatar
Tamas Gal committed
            return
Tamas Gal's avatar
Tamas Gal committed
        min_trigger_rate = min(all_rates)
        max_trigger_rate = max(all_rates)
Tamas Gal's avatar
Tamas Gal committed
        for run_start, run in run_changes_to_plot:
            plt.text(run_start, (min_trigger_rate + max_trigger_rate) / 2,
                     "\nRUN %s  " % run,
                     rotation=60,
                     verticalalignment='top',
                     fontsize=8,
                     color='gray')
            ax.axvline(run_start, color='#ff0f5b', linestyle='--',
                       alpha=0.8)  # added
Tamas Gal's avatar
Tamas Gal committed
        ax.set_title("Trigger Rates for DetID-{0}\n{1} UTC".format(
            self.det_id,
Tamas Gal's avatar
Tamas Gal committed
            datetime.utcnow().strftime("%c")))
Tamas Gal's avatar
Tamas Gal committed
        ax.set_xlabel("time")
        ax.set_ylabel("trigger rate [Hz]")
        ax.xaxis.set_major_formatter(self.styles["xfmt"])
        ax.grid(True, which='minor')
        if self.with_minor_ticks:
            ax.minorticks_on()
        plt.legend()

        fig.tight_layout()

        filename = join(self.plots_path, self.filename + '_lin.png')
        filename_tmp = join(self.plots_path, self.filename + '_lin_tmp.png')
        plt.savefig(filename_tmp, dpi=120, bbox_inches="tight")
        shutil.move(filename_tmp, filename)

        try:
            ax.set_yscale('log')
        except ValueError:
            pass

        filename = join(self.plots_path, self.filename + '.png')
        filename_tmp = join(self.plots_path, self.filename + '_tmp.png')
        plt.savefig(filename_tmp, dpi=120, bbox_inches="tight")
        shutil.move(filename_tmp, filename)

        plt.close('all')
        print("Plot updated at '{}'.".format(filename))

    def trigger_rate_sampling_period(self):
        try:
            return int(Config().get("Monitoring",
                                    "trigger_rate_sampling_period"))
        except (TypeError, ValueError):
            return 180

    def finish(self):
Tamas Gal's avatar
Tamas Gal committed
        self.trigger_rates_fobj.close()
Tamas Gal's avatar
Tamas Gal committed
        self.run = False


def main():
    from docopt import docopt
    args = docopt(__doc__, version=VERSION)

    plots_path = args['-o']
    ligier_ip = args['-l']
    ligier_port = int(args['-p'])

    pipe = kp.Pipeline()
    pipe.attach(kp.io.ch.CHPump,
                host=ligier_ip,
                port=ligier_port,
                tags='IO_EVT',
                timeout=60 * 60 * 24 * 7,
                max_queue=200000)
Tamas Gal's avatar
Tamas Gal committed
    pipe.attach(TriggerRate, interval=300, plots_path=plots_path)
    pipe.drain()


if __name__ == '__main__':
    main()