Skip to content
Snippets Groups Projects
Verified Commit 4ffcd7cc authored by Tamas Gal's avatar Tamas Gal :speech_balloon:
Browse files

Improve queue handling

parent 9e50255e
No related branches found
No related tags found
No related merge requests found
......@@ -23,6 +23,7 @@ from collections import deque, defaultdict
from functools import partial
import io
import os
import random
import time
import threading
......@@ -50,21 +51,38 @@ class CalibrateAHRS(kp.Module):
det_id = self.require('det_id')
det_oid = km3db.tools.todetoid(det_id)
self.time_range = self.get('time_range', default=24 * 3) # hours
self.detector = kp.hardware.Detector(det_id=det_id)
self.n_points_in_graph = self.get('n_points_in_graph', default=2000) # x-axis resolution (per floor)
self.data = {}
self.dus = set()
self.detector = kp.hardware.Detector(det_id=det_id)
self.clbmap = km3db.CLBMap(det_oid=det_oid)
self.cuckoo = kp.time.Cuckoo(60, self.create_plot)
self.cuckoo = kp.time.Cuckoo(60, self.create_plot) # plot update interval [s]
self.cuckoo_log = kp.time.Cuckoo(10, self.cprint)
self.cuckoo_stats = kp.time.Cuckoo(300, self._show_stats)
self.data = {}
self.scale_down_factor = 1000
self.queue_size = int(self.detector.n_doms * 10 * self.time_range * 60**2 / self.detector.n_dus / self.scale_down_factor)
# 10Hz monitoring (including bases)
n_messages = int(10 * self.detector.n_doms * self.time_range * 60**2 + self.detector.n_dus)
self.fraction_to_keep = self.n_points_in_graph * self.detector.n_doms / n_messages
self.cprint(f"Fraction to keep: {self.fraction_to_keep}")
self.queue_size = int(self.n_points_in_graph * 1.1) # a bit of safety margin due to randomness
self.cprint(f"Queue size for each module: {self.queue_size}, time range: {self.time_range} hours")
self.lock = threading.Lock()
self.index = 0
def _show_stats(self):
"""Print some data statistics"""
messages = ["Recorded data:"]
for du, data in self.data.items():
messages.append(f"DU {du}: ")
for floor, times in data["times"].items():
messages.append(f" floor {floor}: {len(times)} ({times[0]}, {times[-1]})")
self.cprint("\n".join(messages))
def _register_du(self, du):
"""Create data cache for DU"""
self.data[du] = {}
......@@ -77,8 +95,11 @@ class CalibrateAHRS(kp.Module):
def process(self, blob):
self.index += 1
if self.index % self.scale_down_factor != 0:
if random.random() > self.fraction_to_keep:
return blob
self.cuckoo_stats()
now = datetime.utcnow()
tmch_data = TMCHData(io.BytesIO(blob['CHData']))
dom_id = tmch_data.dom_id
......@@ -116,7 +137,7 @@ class CalibrateAHRS(kp.Module):
else:
xfmt = md.DateFormatter('%H:%M')
xlim = (datetime.utcfromtimestamp(time.time() -
self.time_range * 60 * 60),
int(self.time_range * 60 * 60)),
datetime.utcnow())
for du in self.dus:
data = self.data[du]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment