美文网首页
Spark core完成ETL项目

Spark core完成ETL项目

作者: 喵星人ZC | 来源:发表于2019-06-20 20:36 被阅读0次

前面Hadoop MR ETL项目文章
Hadoop MR ETL离线项目1
基于ETL离线项目的改造2

一、 SparkCore进行ETL操作,将ETL结果保存到HDFS

package com.soul.bigdata.task.task02

import com.soul.bigdata.Utils.DateTools
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

object LogETLApp {
  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      println("Uage: please input 2 parms: input_path output_path")
      System.exit(0)
    }
    val input_path = args(0)
    val output_path = args(1)

    val sparkconf = new SparkConf()
    //    sparkconf.setMaster("local[2]").setAppName("CDNLogParseApp")
    val sc = new SparkContext(sparkconf)
    //    val log = sc.textFile("file:///D:\\RZ-G6\\2019G6\\data\\access2.log")
    val log = sc.textFile(input_path)
    //baidu CN  2   2019-01-21 17:17:56 123.235.248.216 v2.go2yd.com    http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4    785966

    val tools = new DateTools()

    val etlRDD = log.map(x => {
      val splits = x.split("\t")
      var traffic = 0L
      val length = splits.length
      if (length == 8) {
        val cdn = splits(0)
        val region = splits(1)
        val level = splits(2)
        val time = tools.parseTime(splits(3))
        val ip = splits(4)
        val domain = splits(5)
        val url = splits(6)
        try {
          traffic = splits(7).toInt
        } catch {
          case e: Exception => 0L
        }

        (cdn, region, level, time, ip, domain, url, traffic)
        //        cdn+"\t"+region+"\t"+level+"\t"+time+"\t"+ip+"\t"+domain+"\t"+url+"\t"+traffic
      } else {
        ("", "", "", "", "", "", "", traffic)
        //        "" + "\t" + "" + "\t" +""+  "\t" + ""+ "\t" + "" + "\t" + "" + "\t" + "" + "\t" +traffic
      }
    })
    //(baidu,CN,2,20190121171756,123.235.248.216,v2.go2yd.com,http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4,785966)

    //写入HDFS之前先删除已有的数据
    val configuration = new Configuration()
    val fileSystem = FileSystem.get(configuration)
    if (fileSystem.exists(new Path(output_path))) {
      fileSystem.delete(new Path(output_path))
    }

    etlRDD.coalesce(1).saveAsTextFile(output_path)

    sc.stop()

  }

}

二、SparkCore完成统计分析

package com.soul.bigdata.task.task02

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

object LogAnalysisApp {
  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      println("Uage: please input 2 parms: input_path output_path")
      System.exit(0)
    }
    val input_path = args(0)
    val output_path = args(1)

    val sparkconf = new SparkConf()
    //    sparkconf.setMaster("local[2]").setAppName("LogAnalysisApp")
    val sc = new SparkContext(sparkconf)

    //    val lines = sc.textFile("file:///D:\\RZ-G6\\2019G6\\data\\etl.log")
    val lines = sc.textFile(input_path)
    //baidu,CN,2,20190106102231,171.12.212.129,v3.go2yd.com,http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4,975578
    val domainRDD = lines.map(x => {
      val splits = x.split(",")
      val domain = splits(5)
      (domain, 1)
    })
    //TODO select domain,count(*) from g6_access group by domain;
    val resultRDD = domainRDD.reduceByKey(_ + _).groupByKey()

    val configuration = new Configuration()
    val fileSystem = FileSystem.get(configuration)
    if (fileSystem.exists(new Path(output_path))) {
      fileSystem.delete(new Path(output_path))
    }

    resultRDD.flatMapValues(x => x).coalesce(1).saveAsTextFile(output_path)


    sc.stop()


  }
}

相关文章

网友评论

      本文标题:Spark core完成ETL项目

      本文链接:https://www.haomeiwen.com/subject/vyksqctx.html