美文网首页
51cto赵强HADOOP学习(八)

51cto赵强HADOOP学习(八)

作者: lehuai | 来源:发表于2017-12-14 15:39 被阅读0次

使用Sqoop导入关系型数据库中的数据

数据的交换和集成

使用Sqoop进行HDFS和RDBMS数据的交换
什么是Sqoop?
SQL-to-HDFS工具
利用JDBC链接关系型数据库
Sqoop的获取(伪分布)
#mkdir tools
#cd tools
#tar -zxvf sqoop-1.4.5.bin__hadoop-0.23.tar.gz -C ~/training/
#cd ~/training
# cd sqoop-1.4.5.bin__hadoop-0.23/
# pwd
/root/training/sqoop-1.4.5.bin__hadoop-0.23
#vi ~/.bash_profile
export SQOOP_HOME=/root/training/sqoop-1.4.5.bin__hadoop-0.23

export PATH=$SQOOP_HOME/bin:$PATH
#source ~/.bash_profile

导入Oracle数据库表中指定的列

#sqoop import --connect jdbc:oracle:thin:@ip:1521:orcl --username scott --password tiger --table emp --columns 'empno,ename,sal' -m 1
#hdfs dfs -ls /

导入Oracle数据库表中指定的列,并且指定分隔符和HDFS的路径

#sqoop import --connect jdbc:oracle:thin:@ip:1521:orcl --username scott --password tiger --table emp --columns 'empno,ename,sal' -m 1 --target-dir '/sqoop/data1' --fields-terminated-by '**'
#hdfs dfs -lsr /sqoop/data1

导入Oracle数据库表中的数据,并使用query语句

#sqoop import --connect jdbc:oracle:thin:@ip:1521:orcl --username scott --password tiger --query 'select * from emp where deptno=10 and $CONDITINOS' -m 1 --target-dir '/sqoop/data2'

使用Apache Flume采集数据

什么是Flume?

-Apache Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力

cd tools
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C ~/training/
cd ~/training
cd apache-flume-1.6.0-bin/
cd conf
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=/root/training/jdk1.8.0_144 
cd ..
mkdir /root/training/mylogs
mkdir myagent
cd myagent
vi a3.conf
#bin/flume-ng agent -n a3 -f myagent/a3.conf -c conf -Dflume.root.logger=INFO,console
#定义agent名,source、sink的名称
a3.sources = r1
a3.channels =c1
a3.sinks = k1

#具体定义source
a3.sources.r1.type = spooldir
a3.sources.r1.spoolDir = /root/training/mylogs

#具体定义channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#具体定义sink
a3.sinks.k1.type = logger

#组装source、channel、sink
a3.sources.r1.channels = c1a3.sinks.k1.channel = c1
a3.sinks.k1.channel = c1
#cd ..
#bin/flume-ng agent -n a3 -f myagent/a3.conf -c conf -Dflume.root.logger=INFO,console
cd training
vi a.txt
mv a.txt mylogs/
~/training/apache-flume-1.6.0-bin/
cd myagentls
vi a4.conf
#定义agent名,source、sink的名称
a4.sources = r1
a4.channels =c1
a4.sinks = k1

#具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /root/training/mylogs

#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100

#定义拦截器,为消息添加时间戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder


#具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs:192.168.56.102:9000/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的问价能达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HFDS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60

#组装source、channel、sink
a4.sources.r1.channels =c1
a4.sinks.k1.channel = c1
cd ..
bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console

ls mylogs
vi a.txt
mv a.txt mylogs/
ls mylogs
hdfs dfs -ls
vi a.txt
mv a.txt mylogs
hdfs dfs -ls /
hdfs dfs -lsr /flume
hdfs dfs -cat 

HDFS与Apache Kafka

什么是Apache Kafka?

Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务
image.png

启动zookeeper集群

#cd tools
# tar -zxvf kafka_2.9.2-0.8.1.1.tgz -C ~/training/
#cd ~/training
# cd kafka_2.9.2-0.8.1.1/
# mkdir -p logs/broker1
# mkdir -p logs/broker2
cd logs/broker1
#pwd
/root/training/kafka_2.9.2-0.8.1.1/logs/broker1
#cd ../../config
# vi server.properties 
log.dirs=/root/training/kafka_2.9.2-0.8.1.1/logs/broker1
zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
# cp server.properties server1.properties
#vi server1.properties 
broker.id=1
port=9093
#cd ..
#jps
# bin/kafka-server-start.sh config/server.properties &
# bin/kafka-server-start.sh config/server2.properties &
# vi config/server1.properties 
log.dirs=/root/training/kafka_2.9.2-0.8.1.1/logs/broker2
# bin/kafka-server-start.sh config/server2.properties &
#jps
17413 Jps
17240 Kafka
17374 Kafka
# bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181 --replication-factor 1 --partitions 3 --topic mydemo2

复制一个,当前的命名为消费者,另一个命名为生产者

image.png

生产者

# cd training/kafka_2.9.2-0.8.1/
# ls bin/
# bin/kafka-console-producer.sh --broker-list 192.168.56.102:9092 --topic mydemo2

消费者

# bin/kafka-console-consumer.sh --zookeeper 192.168.56.11:2181 --topic mydemo2

生产者

HelloWorld
image.png
image.png

Apache Kafka与Hadoop HDFS的集成

对于一个实时订阅的系统来说,可以通过Kafka将实时处理和监控的数据加载到Hadoop的HDFS中,或者NoSQL数据库中,或者数据仓库中。

Kafka提供了Hadoop Producer和Consumer用于集成Hadoop

image.png

KafkaHDFSConsumer.java

package demo;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


/*
 * 完成两件事情:
 * 1、作为Kafka的消费者读取Topic中的信息
 * 2、将信息写入HDFS
 */

public class KafkaHDFSConsumer extends Thread{
    
    //指定具体topic名称
    private String topic;
    
    public KafkaHDFSConsumer(String topic) {
        this.topic = topic;
        
    }
    
    public void run() {
        //构造一个consumer的对象
        ConsumerConnector consumer = createConsumer();
        
        //构造一个Map对象,代表topic
        Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
        topicCountMap.put(this.topic, 1);
        
        //构造一个输入流
        Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
        
        //获取每次接收到的具体数据
        KafkaStream<byte[],byte[]> stream = messageStreams.get(this.topic).get(0);
        
        ConsumerIterator<byte[],byte[]> iterator = stream.iterator();
        while(iterator.hasNext()) {
            String message = new String(iterator.next().message());
            //打印
            System.out.println("接收消息:" + message);
            
            //将消息写入HDFS
            try {
                HDFSUtils.sendMessageToHDFS("/kafka/data1.txt", message);
            }catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }

    //创建一个Consumer
    private ConsumerConnector createConsumer() {
        Properties prop = new Properties();
        prop.put("zookeeper.connect", "192.168.56.11:2181,192.168.56.12:2181,19.168.56.13:2181");
        
        //申明一个消费组
        prop.put("group.id", "group1");
        
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
    }

    public static void main(String[] args) {
        new KafkaHDFSConsumer("mydemo").start();
    

    }

}

HDFSUtils

package demo;

import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSUtils {
    
    //定义一个文件系统的对象
    private static FileSystem fs;
    
    static {
        Configuration conf = new Configuration();
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
        
        try {
            fs = FileSystem.get(new URI("hdfs://192.168.56.102:9000"),conf);
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void sendMessageToHDFS(String filename,String data) throws Exception{
        OutputStream out = null;
        
        if(!fs.exists(new Path(filename))) {
            //创建文件
            out = fs.create(new Path(filename));
        }else {
            //如果存在,则追加
            out = fs.append(new Path(filename));
        }
        
        //将数据写入HDFS
        out.write(data.getBytes());
        out.close();
        
    }

}

#bin/kafka-console-producer.sh --broker-list 192.168.56.102:9092 --topic mydemo2

管理和监控MapReduce任务

yarn application -list  //查看正在运行的任务
yarn application -list -appState<State>  //查看集群中某个状态的所有任务
yarn application -status <application_ID>  //查看单个任务的状态信息

Yarn的资源管理和调度

资源池

用户提交的作业将会放进一个能够公平共享资源的Pool(池)中。
系统资源不会与某个具体的池绑定
资源池可以预先定义:也可以在提交任务的时候,动态指定资源池的名称
资源池及其子集,在fair-scheduler.xml中定义

三种方式:

FIFO:基于队列的FIFO调度器(MapReduce 1.0)

Capacity Scheduler:容器调度器

Fair Scheduler:公平调度器
为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)
image.png
image.png

配置Fair Scheduler

Fair Scheduler的配置选项包括两部分

其中一部分在yarn-site.xml中,主要用于配置调度器级别的参数
另外一部分在一个自定义配置文件(默认是fair-scheduler.xml)中,主要用于配置各个队列的资源量、权重等信息

相关文章

网友评论

      本文标题:51cto赵强HADOOP学习(八)

      本文链接:https://www.haomeiwen.com/subject/gikkwxtx.html