Skip to content
Snippets Groups Projects
km3beam.py 3.97 KiB
Newer Older
Massimiliano Lincetto's avatar
Massimiliano Lincetto committed
import numpy as np
import scipy.signal as ssg

from functools import partial

from tqdm import tqdm_notebook as tqdm

"""
    Calculates distance(s) between a point 'p' and a(n array of) point(s) 'q'
    as the norm of the difference vector(s) 'pq'
    m = number of values    
    n = number of coordinates
    p ~ (1, n) - q ~ (m, n) => pq ~ (m, n)
"""
def dist(p, q):
    pq = p - q
    axis = pq.ndim - 1 # so clever! ;)
    return np.linalg.norm(pq, axis=axis)


# envelope for the chirp
def window(t, t1):
    return np.exp(-(2*t/t1)**100)

# windowed chirp
def wchirp(t, f0, t1, f1):
    return window(t, t1) * ssg.chirp(t, f0, t1, f1)

# cross correlation function
def xcorr(u1, u2):
    return np.convolve(u1, u2[::-1], 'same')

# returns a function-object with fixed chirp parameters
def chirp_template(f0, t1, f1):
    return partial(wchirp, f0=f0, t1=t1, f1=f1)

class BeamStreamer():
    O  = (0,0,0) # origin of coordinates
    c0 = 1500   # speed of sound [m/s]  
    
    def __init__(self, f_s, footprint, beacon, noise_level = 0.0, t_margin = 0.0):
        self.f_s         = f_s
        self.beacon      = beacon
        self.footprint   = footprint
        self.calc_delays(footprint)
        self.init_timebase(t_margin)
        self.reset_noise(noise_level)
        
    def calc_delays(self, footprint):
        self.distances = dist(footprint, self.beacon) - dist(self.O, self.beacon) 
        
    def init_timebase(self, t_margin):
        self.T   = 1.5 * (np.max(self.delays) + t_margin)
        self.N   = int(np.ceil(self.T * self.f_s))
        self.T   = self.N / self.f_s
        self.n_s = 1 + 4 * self.N
        self.timebase = np.linspace(-2 * self.T, 2 * self.T, self.n_s)
        self.sect = slice(self.N, 1 + 3 * self.N)
        
    def reset_noise(self, noise_level=0.0):
        shape = (len(self.delays), self.n_s)
        self.streams = np.random.normal(0.0, noise_level, shape)
        
    def sim_signals(self, template):
        self.reference = template(self.timebase)
        for stream, delay in zip(tqdm(self.streams), self.delays):
            stream[self.sect] += template(self.timebase[self.sect] - delay)
    
    def beamform(self, nominal_footprint, probe):
        W = np.zeros_like(self.timebase[self.sect])
        shifts = self.f_s * ((dist(probe, nominal_footprint) - dist(probe, 0)) / self.c0)
        shifts = shifts.astype('int')
        for stream, shift in zip(self.streams, shifts):
            shf = slice(self.sect.start + shift, self.sect.stop + shift)
            W  += stream[shf]
        X = xcorr(W, self.reference[self.sect])
        return W, X
          
    @property
    def delays(self):
        return self.distances / self.c0
    
    @property
    def t(self):
        return self.timebase[self.sect]
    
    @property
    def s(self):
        return self.streams[:, self.sect]
    
    
''' meshgrid helper '''
def gen_probegrid(center, size, steps):
    axis = np.linspace(-size, +size, steps)
    P = np.meshgrid(axis, axis)
    P[0] += center[0]
    P[1] += center[1]
    return P
'''
Generate a meshgrid:
- around a position (beacon)
- of given angular size (size_deg)
- of given number of stesp (steps)
'''
def make_grid(beacon, size_deg, steps):
    r = dist(beacon, (0,0,0))
    size_rad = np.deg2rad(size_deg)
    size_m   = r * size_rad
    return gen_probegrid(beacon, size_m, steps)


class BeamScanner:
    def __init__(self, beamformer, nominal_footprint):
        self.beamformer = beamformer
        self.footprint = nominal_footprint
        self.X = []
        self.W = []
        
    def scan(self, probe_grid, z_grid):
        PX, PY = probe_grid
        self.Z = np.zeros_like(PX)
        for i in tqdm(range(len(PX)), desc='X'):
            for j in tqdm(range(len(PY)), desc='Y', leave=False):
                probe = np.array([PX[i,j], PY[i,j], z_grid])
                W, X = self.beamformer.beamform(self.footprint, probe)
                self.W.append(W)
                self.X.append(X)
                Q = np.max(X)
                self.Z[i, j] = Q