前面Hadoop MR ETL项目文章
Hadoop MR ETL离线项目1
基于ETL离线项目的改造2
一、 SparkCore进行ETL操作,将ETL结果保存到HDFS
package com.soul.bigdata.task.task02
import com.soul.bigdata.Utils.DateTools
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
object LogETLApp {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
println("Uage: please input 2 parms: input_path output_path")
System.exit(0)
}
val input_path = args(0)
val output_path = args(1)
val sparkconf = new SparkConf()
// sparkconf.setMaster("local[2]").setAppName("CDNLogParseApp")
val sc = new SparkContext(sparkconf)
// val log = sc.textFile("file:///D:\\RZ-G6\\2019G6\\data\\access2.log")
val log = sc.textFile(input_path)
//baidu CN 2 2019-01-21 17:17:56 123.235.248.216 v2.go2yd.com http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4 785966
val tools = new DateTools()
val etlRDD = log.map(x => {
val splits = x.split("\t")
var traffic = 0L
val length = splits.length
if (length == 8) {
val cdn = splits(0)
val region = splits(1)
val level = splits(2)
val time = tools.parseTime(splits(3))
val ip = splits(4)
val domain = splits(5)
val url = splits(6)
try {
traffic = splits(7).toInt
} catch {
case e: Exception => 0L
}
(cdn, region, level, time, ip, domain, url, traffic)
// cdn+"\t"+region+"\t"+level+"\t"+time+"\t"+ip+"\t"+domain+"\t"+url+"\t"+traffic
} else {
("", "", "", "", "", "", "", traffic)
// "" + "\t" + "" + "\t" +""+ "\t" + ""+ "\t" + "" + "\t" + "" + "\t" + "" + "\t" +traffic
}
})
//(baidu,CN,2,20190121171756,123.235.248.216,v2.go2yd.com,http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4,785966)
//写入HDFS之前先删除已有的数据
val configuration = new Configuration()
val fileSystem = FileSystem.get(configuration)
if (fileSystem.exists(new Path(output_path))) {
fileSystem.delete(new Path(output_path))
}
etlRDD.coalesce(1).saveAsTextFile(output_path)
sc.stop()
}
}
二、SparkCore完成统计分析
package com.soul.bigdata.task.task02
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
object LogAnalysisApp {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
println("Uage: please input 2 parms: input_path output_path")
System.exit(0)
}
val input_path = args(0)
val output_path = args(1)
val sparkconf = new SparkConf()
// sparkconf.setMaster("local[2]").setAppName("LogAnalysisApp")
val sc = new SparkContext(sparkconf)
// val lines = sc.textFile("file:///D:\\RZ-G6\\2019G6\\data\\etl.log")
val lines = sc.textFile(input_path)
//baidu,CN,2,20190106102231,171.12.212.129,v3.go2yd.com,http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4,975578
val domainRDD = lines.map(x => {
val splits = x.split(",")
val domain = splits(5)
(domain, 1)
})
//TODO select domain,count(*) from g6_access group by domain;
val resultRDD = domainRDD.reduceByKey(_ + _).groupByKey()
val configuration = new Configuration()
val fileSystem = FileSystem.get(configuration)
if (fileSystem.exists(new Path(output_path))) {
fileSystem.delete(new Path(output_path))
}
resultRDD.flatMapValues(x => x).coalesce(1).saveAsTextFile(output_path)
sc.stop()
}
}
网友评论