GAIA all-sky map
You can find this application in the demos
folder of your Jupyter notebook environment.
- gaia_all_sky_map.ipynb
Some datasets are too large to store on any single computer. To analyze such datasets, scientists need ways to work with distributed filesystems. Apache Spark is one engine to deal with such big data.
With Camber, you can create a Spark cluster in one line of Python. This tutorial uses Spark to process a large CSV from the GAIA space archive hosted on the Camber public stash.
Provision a Spark cluster
First, instantiate your Spark cluster using Camber. This may take a few moments.
# provision with LARGE instance
import camber
spark = camber.spark.connect(engine_size="LARGE")
When working with large files and spark, it’s a good practice to define your Schema beforehand. The Spark execution engine can work more efficiently when data types are predefined.
create schema
# Read in unsorted spark source catalog and set schema
from pyspark.sql.types import *
schema = StructType([StructField('solution_id', LongType(), True),
StructField('designation', StringType(), True),
StructField('source_id', LongType(), True),
StructField('random_index', IntegerType(), True),
StructField('ref_epoch', DoubleType(), True),
StructField('ra', DoubleType(), True), StructField('ra_error', DoubleType(), True),
StructField('dec', DoubleType(), True), StructField('dec_error', DoubleType(), True),
StructField('parallax', DoubleType(), True), StructField('parallax_error', DoubleType(), True),
StructField('parallax_over_error', DoubleType(), True), StructField('pm', DoubleType(), True),
StructField('pmra', DoubleType(), True), StructField('pmra_error', DoubleType(), True),
StructField('pmdec', DoubleType(), True), StructField('pmdec_error', DoubleType(), True),
StructField('ra_dec_corr', DoubleType(), True), StructField('ra_parallax_corr', DoubleType(), True),
StructField('ra_pmra_corr', DoubleType(), True), StructField('ra_pmdec_corr', DoubleType(), True),
StructField('dec_parallax_corr', DoubleType(), True), StructField('dec_pmra_corr', DoubleType(), True),
StructField('dec_pmdec_corr', DoubleType(), True), StructField('parallax_pmra_corr', DoubleType(), True),
StructField('parallax_pmdec_corr', DoubleType(), True), StructField('pmra_pmdec_corr', DoubleType(), True),
StructField('astrometric_n_obs_al', IntegerType(), True), StructField('astrometric_n_obs_ac', IntegerType(), True),
StructField('astrometric_n_good_obs_al', IntegerType(), True), StructField('astrometric_n_bad_obs_al', IntegerType(), True),
StructField('astrometric_gof_al', DoubleType(), True), StructField('astrometric_chi2_al', DoubleType(), True),
StructField('astrometric_excess_noise', DoubleType(), True), StructField('astrometric_excess_noise_sig', DoubleType(), True), StructField('astrometric_params_solved', IntegerType(), True),
StructField('astrometric_primary_flag', BooleanType(), True), StructField('nu_eff_used_in_astrometry', DoubleType(), True), StructField('pseudocolour', DoubleType(), True), StructField('pseudocolour_error', DoubleType(), True), StructField('ra_pseudocolour_corr', DoubleType(), True),
StructField('dec_pseudocolour_corr', DoubleType(), True), StructField('parallax_pseudocolour_corr', DoubleType(), True), StructField('pmra_pseudocolour_corr', DoubleType(), True), StructField('pmdec_pseudocolour_corr', DoubleType(), True),
StructField('astrometric_matched_transits', IntegerType(), True), StructField('visibility_periods_used', IntegerType(), True), StructField('astrometric_sigma5d_max', DoubleType(), True), StructField('matched_transits', IntegerType(), True), StructField('new_matched_transits', IntegerType(), True),
StructField('matched_transits_removed', IntegerType(), True), StructField('ipd_gof_harmonic_amplitude', DoubleType(), True), StructField('ipd_gof_harmonic_phase', DoubleType(), True), StructField('ipd_frac_multi_peak', IntegerType(), True), StructField('ipd_frac_odd_win', IntegerType(), True),
StructField('ruwe', DoubleType(), True), StructField('scan_direction_strength_k1', DoubleType(), True), StructField('scan_direction_strength_k2', DoubleType(), True), StructField('scan_direction_strength_k3', DoubleType(), True), StructField('scan_direction_strength_k4', DoubleType(), True),
StructField('scan_direction_mean_k1', DoubleType(), True), StructField('scan_direction_mean_k2', DoubleType(), True), StructField('scan_direction_mean_k3', DoubleType(), True), StructField('scan_direction_mean_k4', DoubleType(), True), StructField('duplicated_source', BooleanType(), True), StructField('phot_g_n_obs', IntegerType(), True), StructField('phot_g_mean_flux', DoubleType(), True), StructField('phot_g_mean_flux_error', DoubleType(), True), StructField('phot_g_mean_flux_over_error', DoubleType(), True), StructField('phot_g_mean_mag', DoubleType(), True), StructField('phot_bp_n_obs', IntegerType(), True), StructField('phot_bp_mean_flux', DoubleType(), True), StructField('phot_bp_mean_flux_error', DoubleType(), True), StructField('phot_bp_mean_flux_over_error', DoubleType(), True), StructField('phot_bp_mean_mag', DoubleType(), True), StructField('phot_rp_n_obs', IntegerType(), True), StructField('phot_rp_mean_flux', DoubleType(), True), StructField('phot_rp_mean_flux_error', DoubleType(), True), StructField('phot_rp_mean_flux_over_error', DoubleType(), True), StructField('phot_rp_mean_mag', DoubleType(), True), StructField('phot_bp_n_contaminated_transits', IntegerType(), True), StructField('phot_bp_n_blended_transits', IntegerType(), True), StructField('phot_rp_n_contaminated_transits', IntegerType(), True), StructField('phot_rp_n_blended_transits', IntegerType(), True), StructField('phot_proc_mode', IntegerType(), True), StructField('phot_bp_rp_excess_factor', DoubleType(), True), StructField('bp_rp', DoubleType(), True), StructField('bp_g', DoubleType(), True), StructField('g_rp', DoubleType(), True), StructField('dr2_radial_velocity', DoubleType(), True), StructField('dr2_radial_velocity_error', DoubleType(), True), StructField('dr2_rv_nb_transits', IntegerType(), True), StructField('dr2_rv_template_teff', DoubleType(), True), StructField('dr2_rv_template_logg', DoubleType(), True), StructField('dr2_rv_template_fe_h', DoubleType(), True), StructField('l', DoubleType(), True), StructField('b', DoubleType(), True), StructField('ecl_lon', DoubleType(), True), StructField('ecl_lat', DoubleType(), True)])
Use Spark to read GAIA dataset from the public stash
Now use the stash package to get the GAIA dataset from the open stash:
import camber
df = camber.stash.public.read_spark(path="datasets/gaia/gedr3/GaiaSource_*.csv.gz", spark_session=spark, format="csv", header=True, schema=schema)
Compute galaxy star counts for a 2D histogram
Now that you have access to the data, you can use the PySpark SQL module to compute a histogram:
create histogram function
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.types import *
# We need to define a function to compute the histogram
def histogram2d(df, cond1, cond2, numbins1, numbins2, min1=None, max1=None, min2=None, max2=None):
"""
Function adapted from the Astronomy eXtensions for Spark (AXS) code base
Uses `cond1` and `cond2` colunm expressions to obtain data for 2D histogram calculation. The data on
x axis will be binned into `numbins1` bins. The data on y axis will be binned into `numbins2` bins.
If `min1`, `max1`, `min2` or `max2` are not spacified, they will be calculated using an additional pass
through the data.
The method returns x, y and z 2-D numpy arrays (see numpy.mgrid) which can be used as an input to
`matplotlib.pcolormesh`.
:param cond1: Column expression determining the data on x axis.
:param cond2: Column expression determining the data on y axis.
:param numbins1: Number of bins for x axis.
:param numbins2: Number of bins for y axis.
:param min1: Optional minimum value for x axis data.
:param max1: Optional maximum value for x axis data.
:param min2: Optional minimum value for y axis data.
:param max2: Optional maximum value for y axis data.
:return: x, y, z 2-D numpy "meshgrid" arrays (see numpy.mgrid)
"""
colname1 = "axs_hist_col1"
colname2 = "axs_hist_col2"
res = df.select(cond1.alias(colname1), cond2.alias(colname2))
if min1 is None or max1 is None or min2 is None or max2 is None:
mm = res.select(F.min(res[colname1]).alias("min1"), F.max(res[colname1]).alias("max1"),
F.min(res[colname2]).alias("min2"), F.max(res[colname2]).alias("max2")).\
collect()
(min1, max1, min2, max2) = (mm[0]["min1"], mm[0]["max1"], mm[0]["min2"], mm[0]["max2"])
rng1 = float(max1 - min1)
rng2 = float(max2 - min2)
step1 = rng1 / numbins1
step2 = rng2 / numbins2
hist2d = res.withColumn("bin1", ((res[colname1]-min1)/step1).cast("int")*step1+min1) \
.withColumn("bin2", ((res[colname2]-min2)/step2).cast("int")*step2+min2).\
groupBy("bin1", "bin2").count()
hist2data = hist2d.orderBy(hist2d.bin1, hist2d.bin2).collect()
bin1 = list(map(lambda row: row.bin1, hist2data))
bin2 = list(map(lambda row: row.bin2, hist2data))
vals = list(map(lambda row: row["count"], hist2data))
x, y = np.mgrid[slice(min1, max1 + step1, step1),
slice(min2, max2 + step2, step2)]
z = np.zeros(x.shape)
for b1, b2, v in zip(bin1, bin2, vals):
z[round((b1-min1)/step1)][round((b2-min2)/step2)] = v
return x, y, z
Check out the AXS for more information about how to utilize Spark for astronomy related workflows.
Now run the histogram2d()
function on the GAIA dataframe:
x, y, z = histogram2d(df, df.ra, df.dec, 1000, 1000, min1=0, max1=360, min2=-90, max2=90)
Note that it may take a few minutes for the preceding command to complete. This is a massive sort―on about 1TB of data.
Plot the histogram
import matplotlib
import matplotlib.pyplot as plt
plt.pcolormesh(x, y, z, norm=matplotlib.colors.LogNorm(1,1.e5), cmap='viridis')
plt.xlabel("Right Ascension")
plt.ylabel("Declination")
plt.colorbar(label='Counts')
plt.savefig("gaia_hist.png")
Stop the Spark cluster
spark.stop()