Skip to content
Snippets Groups Projects
Commit c946facf authored by Tamas Gal's avatar Tamas Gal :speech_balloon:
Browse files

Merge branch 'master' of git.km3net.de:km3py/km3mon

parents dfac9304 f765f911
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,7 @@ from collections import defaultdict, deque, OrderedDict
from itertools import chain
import sys
from io import BytesIO
from os.path import join
from os.path import join, exists
import shutil
import time
import threading
......@@ -47,13 +47,19 @@ km3pipe.style.use('km3pipe')
class TriggerRate(kp.Module):
def configure(self):
self.plots_path = self.require('plots_path')
self.data_path = self.get('data_path', default='data')
self.interval = self.get("interval",
default=self.trigger_rate_sampling_period())
self.filename = self.get("filename", default="trigger_rates")
self.with_minor_ticks = self.get("with_minor_ticks", default=False)
print("Update interval: {}s".format(self.interval))
self.trigger_counts = defaultdict(int)
self.trigger_rates = OrderedDict()
self._trigger_types = ["Overall", "3DMuon", "MXShower", "3DShower"]
self.trigger_rates_fobj = None
self.initialise_data_logging()
self.styles = {
"xfmt": md.DateFormatter('%Y-%m-%d %H:%M'),
......@@ -65,17 +71,27 @@ class TriggerRate(kp.Module):
}
queue_len = int(60 * 24 / (self.interval / 60))
for trigger in ["Overall", "3DMuon", "MXShower", "3DShower"]:
for trigger in self._trigger_types:
self.trigger_rates[trigger] = deque(maxlen=queue_len)
self.run = True
self.thread = threading.Thread(target=self.plot).start()
threading.Thread(target=self.plot).start()
self.lock = threading.Lock()
self.run_changes = []
self.current_run_id = 0
self.det_id = 0
def initialise_data_logging(self):
filename = join(self.data_path, "trigger_rates.csv")
if not exists(filename):
self.trigger_rates_fobj = open(filename, "w")
self.trigger_rates_fobj.write('timestamp,' +
','.join(self._trigger_types) + '\n')
else:
self.trigger_rates_fobj = open(filename, "a")
self.trigger_rates_fobj.flush()
def process(self, blob):
if not str(blob['CHPrefix'].tag) == 'IO_EVT':
return blob
......@@ -125,18 +141,35 @@ class TriggerRate(kp.Module):
def plot(self):
while self.run:
time.sleep(self.interval)
timestamp, trigger_rates = self.calculate_trigger_rates()
self.write_trigger_rates(timestamp, trigger_rates)
self.create_plot()
def create_plot(self):
print('\n' + self.__class__.__name__ + ": updating plot.")
def write_trigger_rates(self, timestamp, trigger_rates):
entry = f"{timestamp}"
for trigger_type in self._trigger_types:
try:
trigger_rate = trigger_rates[trigger_type]
except KeyError:
trigger_rate = 0
entry += f",{trigger_rate}"
entry += '\n'
self.trigger_rates_fobj.write(entry)
self.trigger_rates_fobj.flush()
def calculate_trigger_rates(self):
timestamp = datetime.utcnow()
trigger_rates = {}
with self.lock:
for trigger, n_events in self.trigger_counts.items():
trigger_rate = n_events / self.interval
self.trigger_rates[trigger].append((timestamp, trigger_rate))
trigger_rates[trigger] = trigger_rate
self.trigger_counts = defaultdict(int)
return timestamp.timestamp(), trigger_rates
def create_plot(self):
print('\n' + self.__class__.__name__ + ": updating plot.")
fig, ax = plt.subplots(figsize=(16, 4))
......@@ -208,9 +241,8 @@ class TriggerRate(kp.Module):
return 180
def finish(self):
self.trigger_rates_fobj.close()
self.run = False
if self.thread is not None:
self.thread.stop()
def main():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment