美文网首页
spark读写ES

spark读写ES

作者: 阿粒_lxf | 来源:发表于2018-07-10 01:11 被阅读0次

所有测试代码全部基于scala,构建工具基于sbt

build.sbt依赖

name := "spark-demo"
version := "1.0"
scalaVersion := "2.11.8"
val sparkVersion = "2.1.1"

libraryDependencies ++= Seq(
  //spark
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  ("org.elasticsearch" %% "elasticsearch-spark-20" % "6.0.0").excludeAll(
    ExclusionRule(organization = "org.apache.spark")
  )
)

spark-sql读写ES

    /**
  * @author created by LXF on 2018/6/1 10:03
  */

import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._

object App {

  case class Person(age: Long, name: String)

  def main(args: Array[String]): Unit = {

    println("Hello World!")

    System.setProperty("hadoop.home.dir", "G:\\hadoop_home")

    val spark = SparkSession.builder()
      .appName("SparkTest")
      .master("local[*]")
      .config("es.index.auto.create", "true")
      .config("pushdown", "true")
      .config("es.nodes", "192.168.7.130")
      .config("es.port", "9200")
      .config("es.nodes.wan.only", "true")
      .getOrCreate()

    import spark.implicits._
    //从ES中读取数据  {age: xxx, name: xxx} 类型
    val sparkDF = spark.sqlContext.read
      .format("org.elasticsearch.spark.sql")、
      .option("inferSchema", "true").load("test_lxf").as[Person]

    //    sparkDF.take(10).foreach(println(_))

    //    val data = spark.read.textFile("g:\\mydata\\*")

    //写入到ES,一定要按照这个格式,因为这种格式才带有元数据信息,content就是ES中的列名
    val rdd = sparkDF.rdd
    //    println(s"rdd = ${rdd}")

    rdd.saveToEs("index/external")

    spark.stop()

  }

}

spark RDD 读ES

import org.apache.spark.rdd.RDD
import org.elasticsearch.spark._ 
object LoadElasticsearchData { 
    def main(args: Array[String]): Unit = { 
        val sc = new SparkContext(
      new SparkConf()
        .setAppName("e2e.computing.test")
        .setMaster("local[*]")
        .set("spark.cassandra.connection.host", "192.168.14.141")
        //.set("es.nodes", "192.168.14.140")
        //192.168.7.130:9200
        .set("es.nodes", "192.168.7.130")
        .set("es.port", "9200")
        .set("es.index.auto.create", "true")
        .set("es.mapping.date.rich", "false")
    )
    // ES的RDD  test_lxf   query = "查询串"  elasticsearch.spark 默认全部查出数据
    val query =
        s"""
           |{
           |  "query": {
           |    "match_all": {}
           |  }
           |}
         """.stripMargin
   val esRdd = sc.esRDD(s"test_lxf", query) 
}

spark RDD 写ES

import com.ffcs.itm.e2e.test.util
import org.elasticsearch.spark._

/**
  * @author LXF
  *         2018/3/28 15:29
  */
object SaveElasticsearch {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(
      new SparkConf()
        .setAppName("e2e.computing.test")
        .setMaster("local[*]")
        .set("spark.cassandra.connection.host", "192.168.14.141")
        //.set("es.nodes", "192.168.14.140")
        //192.168.7.130:9200
        .set("es.nodes", "192.168.7.130")
        .set("es.port", "9200")
        .set("es.index.auto.create", "true")
        .set("es.mapping.date.rich", "false")
    )
    
    val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
    //不存在就新建
    sc.makeRDD(Seq(airports)).saveToEs("test_lxf2")
  }
}

相关文章

网友评论

      本文标题:spark读写ES

      本文链接:https://www.haomeiwen.com/subject/eemxpftx.html