spark example

← разместить еще код

Расшаренный код:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;

import static org.apache.spark.sql.functions.avg;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.grouping;


public class Main {
    public static void main(String[] args) throws URISyntaxException {
        URL resource = Main.class.getResource("GlobalLandTemperaturesByCountry.csv");
        File filePath = Paths.get(resource.toURI()).toFile();

        //firstVariant(filePath,spark);

        SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .config("spark.master", "local")
                .getOrCreate();

        Dataset<Row> ds = spark.read()
                .format("csv")
                .option("header","true")
                //.option("mode", "DROPMALFORMED")
                .load(filePath.getAbsolutePath());

        /*почему в задании в тесте намек про sql context, но в коде, не вижу разницы в дата сетах, полученных через SparkSession и SQLContext
        SQLContext sqlContext = new SQLContext(spark);
        Dataset<Row> file_ds = sqlContext
                .read()
                .format("csv")
                .option("header", "true")
                .load(filePath.getAbsolutePath());*/

        ds.createOrReplaceTempView("GlobalLandTemperaturesByCountry");

        Dataset<Row> result_ds_decade = spark.sql("select " +
                "country," +
                "cast(year(dt) / 10 as int) as decade," +
                "avg(AverageTemperature) as average_temperature " +
                "from GlobalLandTemperaturesByCountry " +
                "group by country, cast(year(dt)/10 as int) " +
                "order by country " +
                "limit 10");

        result_ds_decade.show();

        Dataset<Row> result_ds_year = spark.sql("select " +
                "country," +
                "year(dt) as year, " +
                "avg(AverageTemperature) as average_temperature " +
                "from GlobalLandTemperaturesByCountry " +
                "group by country, year(dt) " +
                "order by country,year " +
                "limit 10");

        result_ds_year.show();

        result_ds_decade.join(result_ds_year,col("decade").contains(col("year"))).show();

        spark.stop();
    }

        //result_ds.write().parquet("/result_ds");
        //result_ds.write()
        //.format("com.databricks.spark.csv")
        //.option("header", "true")
        //.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
        //.save("result_ds.csv");
        //spark.sql("select AVG(AverageTemperature) from GlobalLandTemperaturesByCountry where dt > '1743-11-01' group by country limit 5").show();
        //ds.select(col("Country"),col("AverageTemperature"),grouping("Country"),avg("AverageTemperature"))
        //.show();
}