Source code for diviner.plot_utils

# -*- coding: utf-8 -*-

from matplotlib.pylab import gcf, subplots, figure
# from mpl_toolkits.mplot3d import axes3d
import matplotlib.animation as animation
import numpy as np
import sys
import pandas as pd


[docs]def save_to_www(fname, **kwargs): gcf().savefig("/u/paige/maye/WWW/calib/"+fname, **kwargs)
[docs]def plot_calib_block(df, label, id, det='a6_11', limits=None, **kwargs): """Plot one designated calibration block. Parameters: ----------- df: pandas Dataframe that has the block labels defined to use as filter. label: one of 'calib','bb','sv','st' id: number of block label to be plotted det: identifier of the detector, a6_11 is default being used. """ if not label.endswith('_block_labels'): label = label + '_block_labels' dfnow = df[df[label] == id] df_to_plot = pd.DataFrame(index=dfnow.index) boolean_selectors = ['is_moving', 'is_spaceview', 'is_bbview', 'is_stview'] for sel in boolean_selectors: # get the timeseries for chosen detector where selector is true timeseries = dfnow[dfnow[sel]][det] if len(timeseries) > 0: # add to dataframe, cut off 'is_' from name (nicer for plot) df_to_plot[sel[3:]] = timeseries ax = df_to_plot.plot(style='.', **kwargs) # ax.yaxis.set_major_formatter(y_formatter) ax.set_title(det) if limits: ax.set_ylim(limits)
[docs]def plot_all_calib_blocks(df, **kwargs): """Plot all calibration blocks found in the provided dataframe. Parameters: =========== df: pandas Dataframe with block labels defined (went through define_sdtype()) kwargs: same keyword arguments as plot_calib_block """ calib_ids = df.calib_block_labels.unique().tolist() # check if the calib block has actually calibration data for calid in calib_ids[:]: if not any(df[df.calib_block_labels == calid]['is_calib']): print("Calib block {0} has no caldata.".format(calid)) calib_ids.remove(calid) length = len(calib_ids) if not length%2 == 0: length += 1 fig, axes = subplots(length/2, 2) for i, calid in enumerate(calib_ids): plot_calib_block(df, 'calib', calid, ax=axes.flatten()[i], **kwargs) fig.suptitle("Cal-Blocks {0}".format(calib_ids))
[docs]def plot_all_channels(df_in, det_list, only_thermal=True, **kwargs): """plot the data for each det in det_list for all channels. Parameters: df pandas DataFrame det_list list of detector numbers between 1..21 **kwargs keyword arguments for subplots call """ df = df_in.resample('10s') print("Resampled to 10 s.") fig, axes = subplots(3, 3, **kwargs) for ch in range(1, 7): if ch in [1, 2] and only_thermal: continue axis = axes.flatten()[ch-1] cols = ['a'+str(ch)+'_'+str(i).zfill(2) for i in det_list] df[cols].plot(ax=axis) axis.legend(loc='best') axis.set_title('Channel {0}'.format(ch)) for ch in range(1, 4): axis = axes.flatten()[ch-1+6] cols = ['b'+str(ch)+'_'+str(i).zfill(2) for i in det_list] df[cols].plot(ax=axis) axis.set_title('Channel {0}'.format(ch+6)) axis.legend(loc='best')
[docs]def create_plot_pointings(azim_start=-60, azim_min=-80, azim_max=-40, elev_start=30, elev_min=-10): # azimuths azis_down = list(range(azim_start, azim_min, -1)) azis_up = list(range(azim_min, azim_max)) azis_down2 = list(range(azim_max, azim_start, -1)) azis = azis_down + azis_up + azis_down2 # elevations elevs_down = list(range(elev_start, elev_min, -1)) elevs_up = list(range(elev_min, elev_start)) elevs = elevs_down + elevs_up # return pointing tuples return list(zip(elevs, azis))
[docs]def plot3d_animation(df_in): df = df_in.resample('1min') Z = df.values.T x = np.arange(len(df)) y = np.arange(21) X, Y = np.meshgrid(x, y) pointings = create_plot_pointings() def update_pointing(pointing): ax.view_init(*pointing) fig = figure(figsize=(10,8)) ax = fig.add_subplot(111, projection='3d') ax.plot_wireframe(X, Y, Z, rstride=5, cstride=5) view_ani = animation.FuncAnimation(fig, update_pointing, pointings, interval=100, repeat_delay=2000)
[docs]class ProgressBar: def __init__(self, iterations): self.iterations = iterations self.prog_bar = '[]' self.fill_char = '*' self.width = 50 self.__update_amount(0)
[docs] def animate(self, iter): print('\r', self, end='') sys.stdout.flush() self.update_iteration(iter + 1)
[docs] def update_iteration(self, elapsed_iter): self.__update_amount((elapsed_iter / float(self.iterations)) * 100.0) self.prog_bar += ' %d of %s complete' % (elapsed_iter, self.iterations)
def __update_amount(self, new_amount): percent_done = int(round((new_amount / 100.0) * 100.0)) all_full = self.width - 2 num_hashes = int(round((percent_done / 100.0) * all_full)) self.prog_bar = '[' + self.fill_char * num_hashes + ' ' * (all_full - num_hashes) + ']' pct_place = (len(self.prog_bar) // 2) - len(str(percent_done)) pct_string = '%d%%' % percent_done self.prog_bar = self.prog_bar[0:pct_place] + \ (pct_string + self.prog_bar[pct_place + len(pct_string):]) def __str__(self): return str(self.prog_bar)