使用Sqoop导入关系型数据库中的数据
数据的交换和集成
使用Sqoop进行HDFS和RDBMS数据的交换
什么是Sqoop?
SQL-to-HDFS工具
利用JDBC链接关系型数据库
Sqoop的获取(伪分布)
#mkdir tools
#cd tools
#tar -zxvf sqoop-1.4.5.bin__hadoop-0.23.tar.gz -C ~/training/
#cd ~/training
# cd sqoop-1.4.5.bin__hadoop-0.23/
# pwd
/root/training/sqoop-1.4.5.bin__hadoop-0.23
#vi ~/.bash_profile
export SQOOP_HOME=/root/training/sqoop-1.4.5.bin__hadoop-0.23
export PATH=$SQOOP_HOME/bin:$PATH
#source ~/.bash_profile
导入Oracle数据库表中指定的列
#sqoop import --connect jdbc:oracle:thin:@ip:1521:orcl --username scott --password tiger --table emp --columns 'empno,ename,sal' -m 1
#hdfs dfs -ls /
导入Oracle数据库表中指定的列,并且指定分隔符和HDFS的路径
#sqoop import --connect jdbc:oracle:thin:@ip:1521:orcl --username scott --password tiger --table emp --columns 'empno,ename,sal' -m 1 --target-dir '/sqoop/data1' --fields-terminated-by '**'
#hdfs dfs -lsr /sqoop/data1
导入Oracle数据库表中的数据,并使用query语句
#sqoop import --connect jdbc:oracle:thin:@ip:1521:orcl --username scott --password tiger --query 'select * from emp where deptno=10 and $CONDITINOS' -m 1 --target-dir '/sqoop/data2'
使用Apache Flume采集数据
什么是Flume?
-Apache Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力
cd tools
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C ~/training/
cd ~/training
cd apache-flume-1.6.0-bin/
cd conf
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=/root/training/jdk1.8.0_144
cd ..
mkdir /root/training/mylogs
mkdir myagent
cd myagent
vi a3.conf
#bin/flume-ng agent -n a3 -f myagent/a3.conf -c conf -Dflume.root.logger=INFO,console
#定义agent名,source、sink的名称
a3.sources = r1
a3.channels =c1
a3.sinks = k1
#具体定义source
a3.sources.r1.type = spooldir
a3.sources.r1.spoolDir = /root/training/mylogs
#具体定义channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#具体定义sink
a3.sinks.k1.type = logger
#组装source、channel、sink
a3.sources.r1.channels = c1a3.sinks.k1.channel = c1
a3.sinks.k1.channel = c1
#cd ..
#bin/flume-ng agent -n a3 -f myagent/a3.conf -c conf -Dflume.root.logger=INFO,console
cd training
vi a.txt
mv a.txt mylogs/
~/training/apache-flume-1.6.0-bin/
cd myagentls
vi a4.conf
#定义agent名,source、sink的名称
a4.sources = r1
a4.channels =c1
a4.sinks = k1
#具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /root/training/mylogs
#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
#定义拦截器,为消息添加时间戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs:192.168.56.102:9000/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的问价能达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HFDS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60
#组装source、channel、sink
a4.sources.r1.channels =c1
a4.sinks.k1.channel = c1
cd ..
bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
ls mylogs
vi a.txt
mv a.txt mylogs/
ls mylogs
hdfs dfs -ls
vi a.txt
mv a.txt mylogs
hdfs dfs -ls /
hdfs dfs -lsr /flume
hdfs dfs -cat
HDFS与Apache Kafka
什么是Apache Kafka?
Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务

image.png
启动zookeeper集群
#cd tools
# tar -zxvf kafka_2.9.2-0.8.1.1.tgz -C ~/training/
#cd ~/training
# cd kafka_2.9.2-0.8.1.1/
# mkdir -p logs/broker1
# mkdir -p logs/broker2
cd logs/broker1
#pwd
/root/training/kafka_2.9.2-0.8.1.1/logs/broker1
#cd ../../config
# vi server.properties
log.dirs=/root/training/kafka_2.9.2-0.8.1.1/logs/broker1
zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
# cp server.properties server1.properties
#vi server1.properties
broker.id=1
port=9093
#cd ..
#jps
# bin/kafka-server-start.sh config/server.properties &
# bin/kafka-server-start.sh config/server2.properties &
# vi config/server1.properties
log.dirs=/root/training/kafka_2.9.2-0.8.1.1/logs/broker2
# bin/kafka-server-start.sh config/server2.properties &
#jps
17413 Jps
17240 Kafka
17374 Kafka
# bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181 --replication-factor 1 --partitions 3 --topic mydemo2
复制一个,当前的命名为消费者,另一个命名为生产者

image.png
生产者
# cd training/kafka_2.9.2-0.8.1/
# ls bin/
# bin/kafka-console-producer.sh --broker-list 192.168.56.102:9092 --topic mydemo2
消费者
# bin/kafka-console-consumer.sh --zookeeper 192.168.56.11:2181 --topic mydemo2
生产者
HelloWorld

image.png

image.png
Apache Kafka与Hadoop HDFS的集成
对于一个实时订阅的系统来说,可以通过Kafka将实时处理和监控的数据加载到Hadoop的HDFS中,或者NoSQL数据库中,或者数据仓库中。
Kafka提供了Hadoop Producer和Consumer用于集成Hadoop

image.png
KafkaHDFSConsumer.java
package demo;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/*
* 完成两件事情:
* 1、作为Kafka的消费者读取Topic中的信息
* 2、将信息写入HDFS
*/
public class KafkaHDFSConsumer extends Thread{
//指定具体topic名称
private String topic;
public KafkaHDFSConsumer(String topic) {
this.topic = topic;
}
public void run() {
//构造一个consumer的对象
ConsumerConnector consumer = createConsumer();
//构造一个Map对象,代表topic
Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
topicCountMap.put(this.topic, 1);
//构造一个输入流
Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
//获取每次接收到的具体数据
KafkaStream<byte[],byte[]> stream = messageStreams.get(this.topic).get(0);
ConsumerIterator<byte[],byte[]> iterator = stream.iterator();
while(iterator.hasNext()) {
String message = new String(iterator.next().message());
//打印
System.out.println("接收消息:" + message);
//将消息写入HDFS
try {
HDFSUtils.sendMessageToHDFS("/kafka/data1.txt", message);
}catch (Exception e) {
e.printStackTrace();
}
}
}
//创建一个Consumer
private ConsumerConnector createConsumer() {
Properties prop = new Properties();
prop.put("zookeeper.connect", "192.168.56.11:2181,192.168.56.12:2181,19.168.56.13:2181");
//申明一个消费组
prop.put("group.id", "group1");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
}
public static void main(String[] args) {
new KafkaHDFSConsumer("mydemo").start();
}
}
HDFSUtils
package demo;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class HDFSUtils {
//定义一个文件系统的对象
private static FileSystem fs;
static {
Configuration conf = new Configuration();
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
try {
fs = FileSystem.get(new URI("hdfs://192.168.56.102:9000"),conf);
}catch (Exception e) {
e.printStackTrace();
}
}
public static void sendMessageToHDFS(String filename,String data) throws Exception{
OutputStream out = null;
if(!fs.exists(new Path(filename))) {
//创建文件
out = fs.create(new Path(filename));
}else {
//如果存在,则追加
out = fs.append(new Path(filename));
}
//将数据写入HDFS
out.write(data.getBytes());
out.close();
}
}
#bin/kafka-console-producer.sh --broker-list 192.168.56.102:9092 --topic mydemo2
管理和监控MapReduce任务
yarn application -list //查看正在运行的任务
yarn application -list -appState<State> //查看集群中某个状态的所有任务
yarn application -status <application_ID> //查看单个任务的状态信息
Yarn的资源管理和调度
资源池
用户提交的作业将会放进一个能够公平共享资源的Pool(池)中。
系统资源不会与某个具体的池绑定
资源池可以预先定义:也可以在提交任务的时候,动态指定资源池的名称
资源池及其子集,在fair-scheduler.xml中定义
三种方式:
FIFO:基于队列的FIFO调度器(MapReduce 1.0)
Capacity Scheduler:容器调度器
Fair Scheduler:公平调度器
为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)

image.png

image.png
配置Fair Scheduler
Fair Scheduler的配置选项包括两部分
其中一部分在yarn-site.xml中,主要用于配置调度器级别的参数
另外一部分在一个自定义配置文件(默认是fair-scheduler.xml)中,主要用于配置各个队列的资源量、权重等信息
网友评论