GAIA all-sky map

You can find this application in the demos folder of your Jupyter notebook environment.

    • gaia_all_sky_map.ipynb
  • Plot a 2D histogram with GAIA data

    Some datasets are too large to store on any single computer. To analyze such datasets, scientists need ways to work with distributed filesystems. Apache Spark is one engine to deal with such big data.

    With Camber, you can create a Spark cluster in one line of Python. This tutorial does just that, then uses the cluster to process a large CSV from the GAIA space archive.

    Provision a Spark cluster

    First, import Camber and instantiate your cluster. This may take a few moments to process.

    # provision with LARGE instance
    import camber
    spark = camber.spark.connect(engine_size="LARGE")

    When working with large files and spark, it’s a good practice to define your Schema beforehand. Correctly specified types helps optimize for downstream queries.

    create schema
    # Read in unsorted spark source catalog and set schema
    from pyspark.sql.types import *
    schema = StructType([StructField('solution_id', LongType(), True),
                         StructField('designation', StringType(), True), 
                         StructField('source_id', LongType(), True), 
                         StructField('random_index', IntegerType(), True), 
                         StructField('ref_epoch', DoubleType(), True), 
                         StructField('ra', DoubleType(), True), StructField('ra_error', DoubleType(), True), 
                         StructField('dec', DoubleType(), True), StructField('dec_error', DoubleType(), True), 
                         StructField('parallax', DoubleType(), True), StructField('parallax_error', DoubleType(), True),
                         StructField('parallax_over_error', DoubleType(), True), StructField('pm', DoubleType(), True), 
                         StructField('pmra', DoubleType(), True), StructField('pmra_error', DoubleType(), True), 
                         StructField('pmdec', DoubleType(), True), StructField('pmdec_error', DoubleType(), True), 
                         StructField('ra_dec_corr', DoubleType(), True), StructField('ra_parallax_corr', DoubleType(), True), 
                         StructField('ra_pmra_corr', DoubleType(), True), StructField('ra_pmdec_corr', DoubleType(), True), 
                         StructField('dec_parallax_corr', DoubleType(), True), StructField('dec_pmra_corr', DoubleType(), True), 
                         StructField('dec_pmdec_corr', DoubleType(), True), StructField('parallax_pmra_corr', DoubleType(), True), 
                         StructField('parallax_pmdec_corr', DoubleType(), True), StructField('pmra_pmdec_corr', DoubleType(), True), 
                         StructField('astrometric_n_obs_al', IntegerType(), True), StructField('astrometric_n_obs_ac', IntegerType(), True), 
                         StructField('astrometric_n_good_obs_al', IntegerType(), True), StructField('astrometric_n_bad_obs_al', IntegerType(), True), 
                         StructField('astrometric_gof_al', DoubleType(), True), StructField('astrometric_chi2_al', DoubleType(), True), 
                         StructField('astrometric_excess_noise', DoubleType(), True), StructField('astrometric_excess_noise_sig', DoubleType(), True), StructField('astrometric_params_solved', IntegerType(), True), 
                         StructField('astrometric_primary_flag', BooleanType(), True), StructField('nu_eff_used_in_astrometry', DoubleType(), True), StructField('pseudocolour', DoubleType(), True), StructField('pseudocolour_error', DoubleType(), True), StructField('ra_pseudocolour_corr', DoubleType(), True), 
                         StructField('dec_pseudocolour_corr', DoubleType(), True), StructField('parallax_pseudocolour_corr', DoubleType(), True), StructField('pmra_pseudocolour_corr', DoubleType(), True), StructField('pmdec_pseudocolour_corr', DoubleType(), True), 
                         StructField('astrometric_matched_transits', IntegerType(), True), StructField('visibility_periods_used', IntegerType(), True), StructField('astrometric_sigma5d_max', DoubleType(), True), StructField('matched_transits', IntegerType(), True), StructField('new_matched_transits', IntegerType(), True), 
                         StructField('matched_transits_removed', IntegerType(), True), StructField('ipd_gof_harmonic_amplitude', DoubleType(), True), StructField('ipd_gof_harmonic_phase', DoubleType(), True), StructField('ipd_frac_multi_peak', IntegerType(), True), StructField('ipd_frac_odd_win', IntegerType(), True), 
                         StructField('ruwe', DoubleType(), True), StructField('scan_direction_strength_k1', DoubleType(), True), StructField('scan_direction_strength_k2', DoubleType(), True), StructField('scan_direction_strength_k3', DoubleType(), True), StructField('scan_direction_strength_k4', DoubleType(), True), 
                         StructField('scan_direction_mean_k1', DoubleType(), True), StructField('scan_direction_mean_k2', DoubleType(), True), StructField('scan_direction_mean_k3', DoubleType(), True), StructField('scan_direction_mean_k4', DoubleType(), True), StructField('duplicated_source', BooleanType(), True), StructField('phot_g_n_obs', IntegerType(), True), StructField('phot_g_mean_flux', DoubleType(), True), StructField('phot_g_mean_flux_error', DoubleType(), True), StructField('phot_g_mean_flux_over_error', DoubleType(), True), StructField('phot_g_mean_mag', DoubleType(), True), StructField('phot_bp_n_obs', IntegerType(), True), StructField('phot_bp_mean_flux', DoubleType(), True), StructField('phot_bp_mean_flux_error', DoubleType(), True), StructField('phot_bp_mean_flux_over_error', DoubleType(), True), StructField('phot_bp_mean_mag', DoubleType(), True), StructField('phot_rp_n_obs', IntegerType(), True), StructField('phot_rp_mean_flux', DoubleType(), True), StructField('phot_rp_mean_flux_error', DoubleType(), True), StructField('phot_rp_mean_flux_over_error', DoubleType(), True), StructField('phot_rp_mean_mag', DoubleType(), True), StructField('phot_bp_n_contaminated_transits', IntegerType(), True), StructField('phot_bp_n_blended_transits', IntegerType(), True), StructField('phot_rp_n_contaminated_transits', IntegerType(), True), StructField('phot_rp_n_blended_transits', IntegerType(), True), StructField('phot_proc_mode', IntegerType(), True), StructField('phot_bp_rp_excess_factor', DoubleType(), True), StructField('bp_rp', DoubleType(), True), StructField('bp_g', DoubleType(), True), StructField('g_rp', DoubleType(), True), StructField('dr2_radial_velocity', DoubleType(), True), StructField('dr2_radial_velocity_error', DoubleType(), True), StructField('dr2_rv_nb_transits', IntegerType(), True), StructField('dr2_rv_template_teff', DoubleType(), True), StructField('dr2_rv_template_logg', DoubleType(), True), StructField('dr2_rv_template_fe_h', DoubleType(), True), StructField('l', DoubleType(), True), StructField('b', DoubleType(), True), StructField('ecl_lon', DoubleType(), True), StructField('ecl_lat', DoubleType(), True)])

    Read in Gaia data from The Open Stash into Spark Dataframe

    Now use the stash package to get the GAIA dataset from the open stash:

    from camber.stash import open_stash
    df = open_stash.read_spark(path="datasets/gaia/gedr3/GaiaSource_*.csv.gz", spark_session=spark, format="csv", header=True, schema=schema)

    Transform dataframe into 2D histogram

    Now that you have access to the data, you can use the PySpark SQL module to compute a histogram:

    create histogram function
    import numpy as np
    import pyspark.sql.functions as F
    from pyspark.sql.types import *
    
    # We need to define a function to compute the histogram
    def histogram2d(df, cond1, cond2, numbins1, numbins2, min1=None, max1=None, min2=None, max2=None):
         """
         Function adapted from the Astronomy eXtensions for Spark (AXS) code base
    
         Uses `cond1` and `cond2` colunm expressions to obtain data for 2D histogram calculation. The data on
         x axis will be binned into `numbins1` bins. The data on y axis will be binned into `numbins2` bins.
         If `min1`, `max1`, `min2` or `max2` are not spacified, they will be calculated using an additional pass
         through the data.
         The method returns x, y and z 2-D numpy arrays (see numpy.mgrid) which can be used as an input to
         `matplotlib.pcolormesh`.
    
         :param cond1: Column expression determining the data on x axis.
         :param cond2: Column expression determining the data on y axis.
         :param numbins1: Number of bins for x axis.
         :param numbins2: Number of bins for y axis.
         :param min1: Optional minimum value for x axis data.
         :param max1: Optional maximum value for x axis data.
         :param min2: Optional minimum value for y axis data.
         :param max2: Optional maximum value for y axis data.
         :return: x, y, z 2-D numpy "meshgrid" arrays (see numpy.mgrid)
         """
         colname1 = "axs_hist_col1"
         colname2 = "axs_hist_col2"
         res = df.select(cond1.alias(colname1), cond2.alias(colname2))
    
         if min1 is None or max1 is None or min2 is None or max2 is None:
             mm = res.select(F.min(res[colname1]).alias("min1"), F.max(res[colname1]).alias("max1"),
                             F.min(res[colname2]).alias("min2"), F.max(res[colname2]).alias("max2")).\
                 collect()
             (min1, max1, min2, max2) = (mm[0]["min1"], mm[0]["max1"], mm[0]["min2"], mm[0]["max2"])
      
         rng1 = float(max1 - min1)
         rng2 = float(max2 - min2)
         step1 = rng1 / numbins1
         step2 = rng2 / numbins2
        
         hist2d = res.withColumn("bin1", ((res[colname1]-min1)/step1).cast("int")*step1+min1) \
             .withColumn("bin2", ((res[colname2]-min2)/step2).cast("int")*step2+min2).\
             groupBy("bin1", "bin2").count()
       
         hist2data = hist2d.orderBy(hist2d.bin1, hist2d.bin2).collect()
         
         bin1 = list(map(lambda row: row.bin1, hist2data))
         bin2 = list(map(lambda row: row.bin2, hist2data))
         vals = list(map(lambda row: row["count"], hist2data))
        
         x, y = np.mgrid[slice(min1, max1 + step1, step1),
                         slice(min2, max2 + step2, step2)]
         z = np.zeros(x.shape)
         
         for b1, b2, v in zip(bin1, bin2, vals):
             z[round((b1-min1)/step1)][round((b2-min2)/step2)] = v
         return x, y, z

    Check out the AXS for more information about how to utilize Spark for astronomy related workflows.

    Now run the histogram2d() function on the GAIA dataframe:

    x, y, z = histogram2d(df, df.ra, df.dec, 1000, 1000, min1=0, max1=360, min2=-90, max2=90)

    Note that it may take a few minutes for the preceding command to complete. This is a massive sort―on about 1TB of data.

    Plot 2D histogram of Galaxy star counts and save to file

    import matplotlib
    import matplotlib.pyplot as plt
    plt.pcolormesh(x, y, z, norm=matplotlib.colors.LogNorm(1,1.e5), cmap='viridis')
    plt.xlabel("Right Ascension")
    plt.ylabel("Declination")
    plt.colorbar(label='Counts')
    plt.savefig("gaia_hist.png")

    GAIA all-sky map

    Stop the Spark cluster

    spark.stop()