美文网首页bigdata
10. 左外连接

10. 左外连接

作者: 百炼 | 来源:发表于2019-01-01 18:58 被阅读0次

date[2019-01-01]

Data Algorithms(chapter4)

数据说明
[hadoop@chen spark-data]$ cat ch4/input/users.tsv
u1 UT
u2 GA
u3 CA
u4 CA
u5 GA

[hadoop@chen spark-data]$ cat ch4/input/transactions.tsv
t1 p3 u1 1 300
t2 p1 u2 1 100
t3 p1 u1 1 100
t4 p2 u2 1 10
t5 p4 u4 1 9
t6 p1 u1 1 100
t7 p4 u1 1 9
t8 p4 u5 2 40

Left Outer Join

package org.dataalgorithms.chap04.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
  * Demonstrates how to do "left outer join" on two RDD
  * without using Spark's inbuilt feature 'leftOuterJoin'.
  *
  * The main purpose here is to show the comparison with Hadoop
  * MapReduce shown earlier in the book and is only for demonstration purpose.
  * For your project we suggest to use Spark's built-in feature
  * 'leftOuterJoin' or use DataFrame (highly recommended).
  *
  * @author Gaurav Bhardwaj (gauravbhardwajemail@gmail.com)
  * @editor Mahmoud Parsian (mahmoud.parsian@yahoo.com)
  *         *
  */
object LeftOuterJoin {

  def main(args: Array[String]): Unit = {
    if (args.size < 3) {
      println("Usage: LeftOuterJoin <users-data-path> <transactions-data-path> <output-path>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("LeftOuterJoin")
    val sc = new SparkContext(sparkConf)

    val usersInputFile = args(0)
    val transactionsInputFile = args(1)
    val output = args(2)

    val usersRaw = sc.textFile(usersInputFile)
    val transactionsRaw = sc.textFile(transactionsInputFile)

    val users = usersRaw.map(line => {
      val tokens = line.split("\t")
      (tokens(0), ("L", tokens(1))) // Tagging Locations with L
    })

    val transactions = transactionsRaw.map(line => {
      val tokens = line.split("\t")
      (tokens(2), ("P", tokens(1))) // Tagging Products with P
    })
    println("===========users==========")
    users.foreach(println)
    println("===========transactions==========")
    transactions.foreach(println)

    // This operation is expensive and is listed to compare with Hadoop 
    // MapReduce approach, please compare it with more optimized approach 
    // shown in SparkLeftOuterJoin.scala or DataFramLeftOuterJoin.scala
    val all = users union transactions
    println("===========all==========")
    all.foreach(println)

    val grouped = all.groupByKey()
    println("===========group==========")
    grouped.foreach(println)

    val productLocations = grouped.flatMap {
      case (userId, iterable) =>
        // span returns two Iterable, one containing Location and other containing Products 
        val (location, products) = iterable span (_._1 == "L")
        val loc = location.headOption.getOrElse(("L", "UNKNOWN"))
        products.filter(_._1 == "P").map(p => (p._2, loc._2)).toSet
    }
    //
    val productByLocations = productLocations.groupByKey()

    val result = productByLocations.map(t => (t._1, t._2.size)) // Return (product, location count) tuple

    result.saveAsTextFile(output) // Saves output to the file.

    // done
    sc.stop()
  }
}

输出:

===========users==========
(u4,(L,CA))
(u1,(L,UT))
(u5,(L,GA))
(u2,(L,GA))
(u3,(L,CA))
===========transactions==========
(u1,(P,p3))
(u2,(P,p1))
(u1,(P,p1))
(u2,(P,p2))
(u4,(P,p4))
(u1,(P,p1))
(u1,(P,p4))
(u5,(P,p4))
===========all==========
(u4,(L,CA))
(u5,(L,GA))
(u1,(L,UT))
(u2,(L,GA))
(u3,(L,CA))
(u1,(P,p3))
(u2,(P,p1))
(u1,(P,p1))
(u2,(P,p2))
(u4,(P,p4))
(u1,(P,p1))
(u1,(P,p4))
(u5,(P,p4))
===========group==========
(u3,CompactBuffer((L,CA)))
(u5,CompactBuffer((L,GA), (P,p4)))
(u2,CompactBuffer((L,GA), (P,p1), (P,p2)))
(u1,CompactBuffer((L,UT), (P,p3), (P,p1), (P,p1), (P,p4)))
(u4,CompactBuffer((L,CA), (P,p4)))
============= RESULT ==============
(p4,3)
(p1,2)
(p2,1)
(p3,1)

相关文章

  • 10. 左外连接

    date[2019-01-01] Data Algorithms(chapter4) 数据说明[hadoop@ch...

  • 多表查询和事务

    多表查询包括内连接和外连接内连接: 隐式内连接 显示内连接 外连接: 左外连接 右外连接 左外连接: 在内连接的基...

  • 数据库连接查询

    三种连接查询 内连接 左外连接、右外连接、全外连接 交叉连接 高级引用

  • SQL连接

    1、内连接:显式、隐式(两个表中关联字段相同的数据)显示: 隐式: 2、外连接:左外链接、右外连接a、左外连接:(...

  • Mysql关联查询

    左外连接

  • Pandas数据合并

    merage 内连接 左外连接 右外连接 全外连接 示例 join concat merage# pandas提供...

  • MYSQL 多表操作<六>

    1)承接上一节内容 2)多表查询内连接 数据如下 3)左外、右外连接查询 如下图 左外连接数据如下 右外连接数据如...

  • sql 联合查询

    内连接 (inner join) 外连接(Outer Join) 左外连接 (left join 或 Left ...

  • MySQL 内连接、外连接、左连接、右连接、全连接

    图解MySQL 内连接、外连接、左连接、右连接、全连接转载

  • 连接

    连接类型 内连接inner join左外连接 left outer join右外连接 right outer jo...

网友评论

    本文标题:10. 左外连接

    本文链接:https://www.haomeiwen.com/subject/jdfolqtx.html