0%

由于历史原因,hadoop集群中的机器的磁盘空间的大小各不相同,而HDFS在进行写入操作时,并没有考虑到这种情况,所以随着数据量的逐渐增加,磁盘较小的datanode机器上的磁盘空间很快将被写满,从而触发了报警。
此时,不得不手工执行start-balancer.sh来进行balance操作,即使将dfs.balance.bandwidthPerSec 参数设置为10M/s,整个集群达到平衡也需要很长的时间,所以写了个crontab来每天凌晨来执行start-balancer.sh,由于此时集群不平衡的状态还没有那么严重,所以start-balancer.sh很快执行结束了。

不要在namenode中执行banlance指令

由于HDFS需要启动单独的Rebalance Server来执行Rebalance操作,所以尽量不要在NameNode上执行start-balancer.sh,而是找一台比较空闲的机器。

hadoop balance工具的用法:

开始: bin/start-balancer.sh [-threshold <threshold>]

停止: bin/stop-balancer.sh

影响hadoop balance工具的几个参数:

  • threshold
    默认设置:10,参数取值范围:0-100,参数含义:判断集群是否平衡的目标参数,每一个 datanode 存储使用率和集群总存储使用率的差值都应该小于这个阀值 ,理论上,该参数设置的越小,整个集群就越平衡,但是在线上环境中,hadoop集群在进行balance时,还在并发的进行数据的写入和删除,所以有可能无法到达设定的平衡参数值。

  • setBalancerBandwidth
    可以使用如下命令修改 balancer宽带参数
    hdfs dfsadmin -setBalancerBandwidth 67108864
    默认设置:1048576(1 M/S),参数含义:设置balance工具在运行中所能占用的带宽,设置的过大可能会造成mapred运行缓慢

命令 (未验证)

1
hdfs balancer -Dfs.defaultFS=hdfs://<NN_HOSTNAME>:8020 -Ddfs.balancer.movedWinWidth=5400000 -Ddfs.balancer.moverThreads=1000 -Ddfs.balancer.dispatcherThreads=200 -Ddfs.datanode.balance.max.concurrent.moves=5 -Ddfs.balance.bandwidthPerSec=100000000 -Ddfs.balancer.max-size-to-move=10737418240 -threshold 5

参考资料

hadoop command guide
Hadoop集群datanode磁盘不均衡的解决方案

查看文件备份数

1
2
3
4
[root@VLNX107011 hue]# hdfs dfs -ls /facishare-data/flume/test
Found 2 items
drwxr-xr-x - fsdevops fsdevops 0 2015-11-18 11:09 /facishare-data/flume/test/2015
-rw-r--r-- 2 fsdevops fsdevops 33 2015-11-30 17:49 /facishare-data/flume/test/nohup.out

结果行中的第2列是备份系数(注:文件夹信息存储在namenode节点上,没有备份,故文件夹的备份系数是横杠)

查看集群平均备份数

通过hadoop fsck /可以方便的看到Average block replication的值,这个值不一定会与Default replication factor相等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@VLNX107011 hue]# hdfs fsck /
Connecting to namenode via http://VLNX107011:50070
FSCK started by root (auth:SIMPLE) from /VLNX107011 for path / at Fri Dec 04 19:08:09 CST 2015
...................
Total size: 11837043630 B (Total open files size: 166 B)
Total dirs: 3980
Total files: 3254
Total symlinks: 0 (Files currently being written: 2)
Total blocks (validated): 2627 (avg. block size 4505916 B) (Total open file blocks (not validated): 2)
Minimally replicated blocks: 2627 (100.0 %)
Over-replicated blocks: 2253 (85.76323 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 3
Average block replication: 2.9798248
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)
Number of data-nodes: 10
Number of racks: 1
FSCK ended at Fri Dec 04 19:08:09 CST 2015 in 100 milliseconds
The filesystem under path '/' is HEALTHY

可以看到Average block replication, Corrupt blocks, Missing replicas等信息。

修改备份系数

1
2
3
4
5
6
7
8
[root@VLNX107011 hue]# hdfs dfs -setrep -w 3 -R /
Replication 3 set: /user/oozie/share/lib/lib_20151103160704/hive/jetty-all-7.6.0.v20120127.jar
Replication 3 set: /user/oozie/share/lib/lib_20151103160704/hive/jline-2.11.jar
......
Waiting for /backup/gitlab/1447133001_gitlab_backup.tar ........... done
Waiting for /backup/gitlab/1447133732_gitlab_backup.tar ... done
Waiting for /backup/gitlab/1447180217_gitlab_backup.tar ... done
......

可以看到HDFS对所有文件的备份系数进行了刷新。

再次检查刚才文件的备份系数,可以看到从2变为3。

1
2
3
4
[root@VLNX107011 hue]# hdfs dfs -ls /facishare-data/flume/test
Found 2 items
drwxr-xr-x - fsdevops fsdevops 0 2015-11-18 11:09 /facishare-data/flume/test/2015
-rw-r--r-- 3 fsdevops fsdevops 33 2015-11-30 17:49 /facishare-data/flume/test/nohup.out

WARNING
将备份系数从低到高比较容易,但从高到低会特别慢,所以在集群搭建初始就要规划好Default replication factor。
通常备份系数不需要太高,可以是服务器总量的1/3左右即可,Hadoop默认的数值是3。

参考

HDFS修改备份系数和动态增删节点

命令基本格式:

1
hadoop fs -cmd < args >

1. ls 列出文件列表

1
hadoop fs -ls  /

列出hdfs文件系统根目录下的目录和文件

1
hadoop fs -ls -R /

列出hdfs文件系统所有的目录和文件

2. put 上传文件

1
hadoop fs -put < local file > < hdfs file >

hdfs file的父目录一定要存在,否则命令不会执行

1
hadoop fs -put  < local file or dir >...< hdfs dir >

hdfs dir 一定要存在,否则命令不会执行

1
hadoop fs -put - < hdsf  file>

从键盘读取输入到hdfs file中,按Ctrl+D结束输入,hdfs file不能存在,否则命令不会执行

2.1. moveFromLocal

1
hadoop fs -moveFromLocal  < local src > ... < hdfs dst >

与put相类似,命令执行后源文件 local src 被删除,也可以从从键盘读取输入到hdfs file中

2.2. copyFromLocal

1
hadoop fs -copyFromLocal  < local src > ... < hdfs dst >

与put相类似,也可以从从键盘读取输入到hdfs file中

3. get 获取hdfs文件

1
hadoop fs -get < hdfs file > < local file or dir>

local file不能和 hdfs file名字不能相同,否则会提示文件已存在,没有重名的文件会复制到本地

1
hadoop fs -get < hdfs file or dir > ... < local  dir >

拷贝多个文件或目录到本地时,本地要为文件夹路径
注意:如果用户不是root, local 路径要为用户文件夹下的路径,否则会出现权限问题,

3.1. moveToLocal

当前版本中还未实现此命令

3.2. copyToLocal

1
hadoop fs -copyToLocal < local src > ... < hdfs dst >

与get相类似

4. rm 删除文件

1
2
hadoop fs -rm < hdfs file > ...
hadoop fs -rm -r < hdfs dir>...

每次可以删除多个文件或目录

5. mkdir 创建

1
hadoop fs -mkdir < hdfs path>

只能一级一级的建目录,父目录不存在的话使用这个命令会报错

1
hadoop fs -mkdir -p < hdfs path>

所创建的目录如果父目录不存在就创建该父目录

6. getmerge 下载并合并文件

1
hadoop fs -getmerge < hdfs dir >  < local file >

将hdfs指定目录下所有文件排序后合并到local指定的文件中,文件不存在时会自动创建,文件存在时会覆盖里面的内容

1
hadoop fs -getmerge -nl  < hdfs dir >  < local file >

加上nl后,合并到local file中的hdfs文件之间会空出一行

7. cp hdfs间复制

1
hadoop fs -cp  < hdfs file >  < hdfs file >

目标文件不能存在,否则命令不能执行,相当于给文件重命名并保存,源文件还存在

1
hadoop fs -cp < hdfs file or dir >... < hdfs dir >

目标文件夹要存在,否则命令不能执行

8. mv hdfs间移动

1
hadoop fs -mv < hdfs file >  < hdfs file >

目标文件不能存在,否则命令不能执行,相当于给文件重命名并保存,源文件不存在

1
hadoop fs -mv  < hdfs file or dir >...  < hdfs dir >

源路径有多个时,目标路径必须为目录,且必须存在。
注意:跨文件系统的移动(local到hdfs或者反过来)都是不允许的

9. count 统计数据

1
hadoop fs -count < hdfs path >

统计hdfs对应路径下的目录个数,文件个数,文件总计大小
显示为目录个数,文件个数,文件总计大小,输入路径

10. du

1
hadoop fs -du < hdsf path>

显示hdfs对应路径下每个文件夹和文件的大小

1
hadoop fs -du -s < hdsf path>

显示hdfs对应路径下所有文件和(sum)的大小

1
hadoop fs -du -h < hdsf path>

显示hdfs对应路径下每个文件夹和文件的大小,文件的大小用方便阅读的形式表示,例如用64M代替67108864

10.1 文件排序

1
hadoop fs -du /user/mls_3.3 | sort -rn

显示 /user/mls_3.3路径下的文件并按大小排序

11. text

1
hadoop fs -text < hdsf file>

将文本文件或某些格式的非文本文件通过文本格式输出

12. setrep

1
hadoop fs -setrep -R 3 < hdfs path >

改变一个文件在hdfs中的副本个数,上述命令中数字3为所设置的副本个数,-R选项可以对一个人目录下的所有目录+文件递归执行改变副本个数的操作

13. stat 获取文件信息

1
hdoop fs -stat [format] < hdfs path >

返回对应路径的状态信息
[format]可选参数有:%b(文件大小),%o(Block大小),%n(文件名),%r(副本个数),%y(最后一次修改日期和时间)
可以这样书写hadoop fs -stat %b%o%n < hdfs path >,不过不建议,这样每个字符输出的结果不是太容易分清楚

14. tail

1
hadoop fs -tail < hdfs file >

在标准输出中显示文件末尾的1KB数据

15. archive

1
hadoop archive -archiveName name.har -p < hdfs parent dir > < src >* < hdfs dst >

命令中参数name:压缩文件名,自己任意取;< hdfs parent dir > :压缩文件所在的父目录;< src >:要压缩的文件名;< hdfs dst >:压缩文件存放路径
*示例:hadoop archive -archiveName hadoop.har -p /user 1.txt 2.txt /des
示例中将hdfs中/user目录下的文件1.txt,2.txt压缩成一个名叫hadoop.har的文件存放在hdfs中/des目录下,如果1.txt,2.txt不写就是将/user目录下所有的目录和文件压缩成一个名叫hadoop.har的文件存放在hdfs中/des目录下
显示har的内容可以用如下命令:

1
hadoop fs -ls /des/hadoop.jar

显示har压缩的是那些文件可以用如下命令

1
hadoop fs -ls -R har:///des/hadoop.har

注意:har文件不能进行二次压缩。如果想给.har加文件,只能找到原来的文件,重新创建一个。har文件中原来文件的数据并没有变化,har文件真正的作用是减少NameNode和DataNode过多的空间浪费。

16. balancer

1
hdfs balancer

如果管理员发现某些DataNode保存数据过多,某些DataNode保存数据相对较少,可以使用上述命令手动启动内部的均衡过程

17. dfsadmin

1
hdfs dfsadmin -help

管理员可以通过dfsadmin管理HDFS,用法可以通过上述命令查看
hdfs dfsadmin -report

显示文件系统的基本数据

1
hdfs dfsadmin -safemode < enter | leave | get | wait >

enter:进入安全模式;leave:离开安全模式;get:获知是否开启安全模式;
wait:等待离开安全模式

18. distcp

用来在两个HDFS之间拷贝数据

提交MapReduce作业时 肯定看过如下的输出:

17/04/17 14:00:38 INFO mapreduce.Job: Running job: job_1472052053889_0001
17/04/17 14:00:48 INFO mapreduce.Job: Job job_1472052053889_0001 running in uber mode : false
17/04/17 14:00:48 INFO mapreduce.Job: map 0% reduce 0%
17/04/17 14:00:58 INFO mapreduce.Job: map 100% reduce 0%
17/04/17 14:01:04 INFO mapreduce.Job: map 100% reduce 100%

注意上面日志的第二行,显示job_1472052053889_0001不是以uber模式运行的。

什么是uber模式

该模式是2.x开始引入的;以Uber模式运行MR作业,所有的Map Tasks和Reduce Tasks将会在ApplicationMaster所在的容器(container)中运行,即

整个MR作业运行的过程只会启动AM container,

因为不需要启动mapper 和 reducer containers,所以AM不需要和远程containers通信,整个过程简单了。

uber模式一般用处

如果我们的MR作业输入的数据量非常小,启动Map container或Reduce container的时间都比处理数据要长,那么这个作业就可以考虑启用Uber模式运行,一般情况下,对小作业启用Uber模式运行会得到2x-3x的性能提升。

如何启用Uber模式

启用uber模式的要求非常严格,代码如下:

isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks && smallInput && smallMemory && smallCpu && notChainJob && isValidUberMaxReduces;

  • uberEnabled:其实就是 mapreduce.job.ubertask.enable 参数的值,默认情况下为 false ;也就是说默认情况不启用Uber模式;
  • smallNumMapTasks:启用Uber模式的作业Map的个数必须小于等于 mapreduce.job.ubertask.maxmaps 参数的值,该值默认为9;也计算说,在默认情况下,如果你想启用Uber模式,作业的Map个数必须小于10;
  • smallNumReduceTasks:同理,Uber模式的作业Reduce的个数必须小于等于mapreduce.job.ubertask.maxreduces,该值默认为1;也计算说,在默认情况下,如果你想启用Uber模式,作业的Reduce个数必须小于等于1;
  • smallInput:不是任何作业都适合启用Uber模式的,输入数据的大小必须小于等于 mapreduce.job.ubertask.maxbytes 参数的值,默认情况是HDFS一个文件块大小;
  • smallMemory:因为作业是在AM所在的container中运行,所以要求我们设置的Map内存(mapreduce.map.memory.mb)和Reduce内存(mapreduce.reduce.memory.mb)必须小于等于 AM所在容器内存大小设置(yarn.app.mapreduce.am.resource.mb);
  • smallCpu:同理,Map配置的vcores(mapreduce.map.cpu.vcores)个数和 Reduce配置的vcores(mapreduce.reduce.cpu.vcores)个数也必须小于等于AM所在容器vcores个数的设置(yarn.app.mapreduce.am.resource.cpu-vcores);
  • notChainJob:此外,处理数据的Map class(mapreduce.job.map.class)和Reduce class(mapreduce.job.reduce.class)必须不是 ChainMapper 或 ChainReducer 才行;
  • isValidUberMaxReduces:目前仅当Reduce的个数小于等于1的作业才能启用Uber模式。

同时满足上面八个条件才能在作业运行的时候启动Uber模式。下面是一个启用Uber模式运行的作业运行成功的日志:

File System Counters FILE: Number of bytes read=215 FILE: Number of bytes written=505 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1200 HDFS: Number of bytes written=274907 HDFS: Number of read operations=57 HDFS: Number of large read operations=0 HDFS: Number of write operations=11 Job Counters Launched map tasks=2 Launched reduce tasks=1 Other local map tasks=2 Total time spent by all maps in occupied slots (ms)=3664 Total time spent by all reduces in occupied slots (ms)=2492 TOTAL_LAUNCHED_UBERTASKS=3 NUM_UBER_SUBMAPS=2 NUM_UBER_SUBREDUCES=1 Map-Reduce Framework Map input records=2 Map output records=8 Map output bytes=82 Map output materialized bytes=85 Input split bytes=202 Combine input records=8 Combine output records=6 Reduce input groups=5 Reduce shuffle bytes=0 Reduce input records=6 Reduce output records=5 Spilled Records=12 Shuffled Maps =0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=65 CPU time spent (ms)=1610 Physical memory (bytes) snapshot=1229729792 Virtual memory (bytes) snapshot=5839392768 Total committed heap usage (bytes)=3087532032 File Input Format Counters Bytes Read=50 File Output Format Counters Bytes Written=41

细心的同学应该会发现里面多了 TOTAL_LAUNCHED_UBERTASKS、NUM_UBER_SUBMAPS 以及 NUM_UBER_SUBREDUCES 信息,以前需要启用Map Task 或 Reduce Task运行的工作直接在AM中运行,所有出现了NUM_UBER_SUBMAPS和原来Map Task个数一样;同理,NUM_UBER_SUBREDUCES 和Reduce Task个数一样。

主要表

表名 说明 关联键
TBLS 所有hive表的基本信息 TBL_ID,SD_ID
TABLE_PARAM 表级属性,如是否外部表,表注释等 TBL_ID
COLUMNS Hive表字段信息(字段注释,字段名,字段类型,字段序号) SD_ID
SDS 所有hive表、表分区所对应的hdfs数据目录和数据格式 SD_ID,SERDE_ID
SERDE_PARAM 序列化反序列化信息,如行分隔符、列分隔符、NULL的表示字符等 SERDE_ID
PARTITIONS Hive表分区信息 PART_ID,SD_ID,TBL_ID
PARTITION_KEYS Hive分区表分区键 TBL_ID
PARTITION_KEY_VALS Hive表分区名(键值) PART_ID

从上面表的内容来看,hive整个创建表的过程已经比较清楚了。
(1)解析用户提交hive语句,对其进行解析,分解为表、字段、分区等hive对象
(2)根据解析到的信息构建对应的表、字段、分区等对象,从SEQUENCE_TABLE中获取构建对象的最新ID,与构建对象信息(名称,类型等)一同通过DAO方法写入到元数据表中去,成功后将SEQUENCE_TABLE中对应的最新ID+5。

表介绍

1、SEQUENCE_TABLE

对于db、tbl、sds等的SEQUENCE_id ,每次新增的时候 Next_Val

2、DBS

存储hive的DB信息,包括描述信息、存储路径、数据库名、拥有者和角色名

3、DATABASE_PARAMS

db的key-value参数 ,不清楚用途。

###4、SDS

提供文件路径location、InputFormat、OutputFormat、是否压缩、是否是子文件夹存储、SerDe类(对应于SERDES表)。

SerDe类表示各种序列化和反序列化的类。

5、SD_PARAMS

每个SDS的key-value参数

6、SERDES

每个SDS对应的存储的SerDer类,每个SDS记录一个SERDES表的记录

7、SERDE_PARAMS

SERDE的一些参数,主要是行分隔符、列分隔符、NULL字符串等等,可以每个SerDer自己定义

8、CDS

暂时没明白到底是什么,不过其id和tbl_id是一致的,貌似就是tbl_id

9、TBLS

table的具体信息。

Tabid、创建时间、数据库id、last_access、owner(这个后面会和权限控制有关)、表的存储位置id、表名、TBL_TYPE(外部表、内部表)等

10、TABLE_PARAMS

table级别的key-value参数

主要是总文件个数、总文件大小、comment、last_ddl_time(上次执行ddl的时间)、以及用户自定义的一些参数(orcfile中的参数)

11、COLUMNS_V2

列的信息

CD_ID对应的应该是tbl_id

12、PARTITION_KEYS

每个表的partitions 列

13、PARTITIONS

Partitions id 、create_time、part_name、sds_id、tbl_id

14、PARTITION_KEY_VALS

和上面的表对应,每个partitions对应的具体值

15、PARTITION_PARAMS

分区参数,暂时为找到怎么设置每个分区的key-value参数

16、PART_COL_STATS

对于每列的统计信息,在0.11以后增加了

ANALYZE table contline_revenue_day PARTITION(pdate='2014-03-09') compute statistics for COLUMNS contract_line_id , st_date ,contract_no ;

这样的ddl命令来用于统计每个分区的基本统计信息,用于优化

17. 未用到的空表

BUCKETING_COLS :

IDXS

INDEX_PARAMS

SKEWED_COL_NAMES

SKEWED_COL_VALUE_LOC_MAP

SKEWED_STRING_LIST

SKEWED_STRING_LIST_VALUES

SKEWED_VALUES

SORT_COLS

VERSION

1. 准备数据

1
2
3
4
5
6
7
8
浙江,杭州,300
浙江,宁波,150
浙江,温州,200
浙江,嘉兴,100
江苏,南京,270
江苏,苏州,299
江苏,某市,200
江苏,某某市,100

2. 创建表

1
2
3
4
CREATE table pcp
(province string,city string,people int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

3. 导入数据

load data inpath ‘/tmp/1.txt’ into table pcp;

4. 普通查询

1
select * from pcp order by people desc;
1
2
3
4
5
6
7
8
浙江    杭州    300
浙江 宁波 150
浙江 温州 200
浙江 嘉兴 100
江苏 南京 270
江苏 苏州 299
江苏 某市 200
江苏 某某市 100

5. 综合查询

1
2
3
4
5
6
select province,city,
rank() over(order by people desc) rank,
dense_rank() over(order by people desc) dense_rank,
row_number() over(order by people desc) row_number
from pcp
group by province,city,people;
浙江    杭州    300    1    1    1
江苏    苏州    299    2    2    2
江苏    南京    270    3    3    3
江苏    某市    200    4    4    4
浙江    温州    200    4    4    5
浙江    宁波    150    6    5    6
江苏    某某市    100    7    6    7
浙江    嘉兴    100    7    6    8

主要注意4,5,6行的:

row_number:顺序下来,
rank:在遇到数据相同项时,会留下空位,(4,5,6 第一列,4,4,6)
dense_rank:在遇到数据相同项时,不会留下空位,(4,5,6第一列,4,4,5)

6. 分组统计查询

1
2
3
4
5
6
select province,city,
rank() over (partition by province order by people desc) rank,
dense_rank() over (partition by province order by people desc) dense_rank,
row_number() over(partition by province order by people desc) row_number
from pcp
group by province,city,people;
江苏    苏州    299    1    1    1
江苏    南京    270    2    2    2
江苏    某市    200    3    3    3
江苏    某某市    100    4    4    4
浙江    杭州    300    1    1    1
浙江    温州    200    2    2    2
浙江    宁波    150    3    3    3
浙江    嘉兴    100    4    4    4

To examine the internal structure and data of Parquet files, you can use the parquet-tools command that comes with CDH. Make sure this command is in your $PATH. (Typically, it is symlinked from /usr/bin; sometimes, depending on your installation setup, you might need to locate it under a CDH-specific bin directory.) The arguments to this command let you perform operations such as:

  • cat: 打印文件的全部内容信息到标准输出
  • head: 打印文件的头几行内容信息到标准输出
  • schema: 打印Parquet的schema
  • meta: 打印 file footer metadata, 包含 key-value properties (like Avro schema), compression ratios, encodings, compression used, and row group information.
  • dump: 打印所有数据和metadata.

如果没有将jar配置到环境变量,可以使用java -jar parquet-tools-1.8.2.jar -h

Use parquet-tools -h to see usage information for all the arguments. Here are some examples showing parquet-tools usage

cat 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
$ # Be careful doing this for a big file! Use parquet-tools head to be safe.
$ parquet-tools cat sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0

year = 1992
month = 1
day = 3
...

head 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ parquet-tools head -n 2 sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0

year = 1992
month = 1
day = 3
...

schema命令

1
2
3
4
5
6
7
8
9
10
11
12
13
$ parquet-tools schema sample.parq
message schema {
optional int32 year;
optional int32 month;
optional int32 day;
optional int32 dayofweek;
optional int32 dep_time;
optional int32 crs_dep_time;
optional int32 arr_time;
optional int32 crs_arr_time;
optional binary carrier;
optional int32 flight_num;
...

meta 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
$ parquet-tools meta sample.parq
creator: impala version 2.2.0-cdh5.4.3 (build 517bb0f71cd604a00369254ac6d88394df83e0f6)

file schema: schema
-------------------------------------------------------------------
year: OPTIONAL INT32 R:0 D:1
month: OPTIONAL INT32 R:0 D:1
day: OPTIONAL INT32 R:0 D:1
dayofweek: OPTIONAL INT32 R:0 D:1
dep_time: OPTIONAL INT32 R:0 D:1
crs_dep_time: OPTIONAL INT32 R:0 D:1
arr_time: OPTIONAL INT32 R:0 D:1
crs_arr_time: OPTIONAL INT32 R:0 D:1
carrier: OPTIONAL BINARY R:0 D:1
flight_num: OPTIONAL INT32 R:0 D:1
...

row group 1: RC:20636601 TS:265103674
-------------------------------------------------------------------
year: INT32 SNAPPY DO:4 FPO:35 SZ:10103/49723/4.92 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
month: INT32 SNAPPY DO:10147 FPO:10210 SZ:11380/35732/3.14 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
day: INT32 SNAPPY DO:21572 FPO:21714 SZ:3071658/9868452/3.21 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dayofweek: INT32 SNAPPY DO:3093276 FPO:3093319 SZ:2274375/5941876/2.61 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dep_time: INT32 SNAPPY DO:5367705 FPO:5373967 SZ:28281281/28573175/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_dep_time: INT32 SNAPPY DO:33649039 FPO:33654262 SZ:10220839/11574964/1.13 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
arr_time: INT32 SNAPPY DO:43869935 FPO:43876489 SZ:28562410/28797767/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_arr_time: INT32 SNAPPY DO:72432398 FPO:72438151 SZ:10908972/12164626/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
carrier: BINARY SNAPPY DO:83341427 FPO:83341558 SZ:114916/128611/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
flight_num: INT32 SNAPPY DO:83456393 FPO:83488603 SZ:10216514/11474301/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
...

Meta Legend

Row Group Totals

Acronym Definition
RC Row Count
TS Total Byte Size

Row Group Column Details

Acronym Definition
DO Dictionary Page Offset
FPO First Data Page Offset
SZ:{x}/{y}/{z} Size in bytes. x = Compressed total, y = uncompressed total, z = y:x ratio
VC Value Count
RLE Run-Length Encoding

获取文件行数 - ParquetFileReader.readFooters

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String hdfspath = "/user/mls_zl/mysql2hdfs/parquet/time";
Configuration configuration = new Configuration(true);
configuration.set("fs.defaultFS", "hdfs://10.100.1.131:9000");
Path inputPath = new Path(hdfspath);

FileStatus inputFileStatus = inputPath.getFileSystem(configuration).getFileStatus(inputPath);
List<Footer> footers = org.apache.parquet.hadoop.ParquetFileReader.readFooters(HdfsUtils.getConfiguration(),
inputFileStatus, false);
for (Footer footer : footers) {
System.out.println("file:" + footer.getFile().toString());
long cout = 0;
for (BlockMetaData blockMetaData : footer.getParquetMetadata().getBlocks()) {
cout += blockMetaData.getRowCount();
}

System.out.println("size:" + cout);
}

递归统计 - RemoteIterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fs = FileSystem.get(HdfsUtils.getConfiguration());
Path parquetFile;
boolean isFirst = true;

RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(tablePath), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
if (fileStatus.isFile() && fileStatus.getPath().toString().toLowerCase().endsWith(".parquet")) {
parquetFile = fileStatus.getPath();
fileSize += fileStatus.getLen();

parquetFileReader = new ParquetFileReader(HdfsUtils.getConfiguration(), parquetFile,
ParquetMetadataConverter.NO_FILTER);
rows += parquetFileReader.getRecordCount();
if (isFirst) {
columns = (long) parquetFileReader.getFileMetaData().getSchema().getFieldCount();
isFirst = false;
}
parquetFileReader.close();
}
}

读取文件信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void read1(){
ParquetMetadata readFooter = ParquetFileReader.readFooter(fs.getConf(), path, ParquetMetadataConverter.NO_FILTER);
MessageType schema = readFooter.getFileMetaData().getSchema();
List<type> columnInfos = schema.getFields();
ParquetReader<group> reader = ParquetReader.builder(new GroupReadSupport(), path).
withConf(fs.getConf()).build();
int count = 0;
Group recordData = reader.read();

while (count < 10 && recordData != null) {
int last = columnInfos.size() - 1;
StringBuilder builder = new StringBuilder();
builder.append("{\"");
for (int j = 0; j < columnInfos.size(); j++) {
if (j < columnInfos.size() - 1) {
String columnName = columnInfos.get(j).getName();
String value = recordData.getValueToString(j, 0);
builder.append(columnName + "\":\"" + value + "\",");
}
}
String columnName = columnInfos.get(last).getName();
String value = recordData.getValueToString(last, 0);

System.out.println(builder.toString());
count++;
recordData = reader.read();
}

} catch (Exception e) {
}
}

一、sqoop生成

sqoop 将 mysql 等关系型数据库的数据导入到hive数据库中 并 生成parquet文件时,会将date类型转换成 INT64

同时 sqoop 会在 parquet 的fileMetaData中记录原先的sql类型信息,如下所示:

type sql type parquet type 字段名
long -5 INT64 TABLE_ID
string 12 binary TABLE_NAME
int 4 int32 TABLE_TYPE
long 93 Int64 MODIFY_TIME
bytes -4 binary TABLE_SCHEMA
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
{
"type": "record",
"name": "dw_data_table",
"doc": "Sqoop import of dw_data_table",
"fields": [
{
"name": "TABLE_ID",
"type": [
"null",
"long"
],
"default": null,
"columnName": "TABLE_ID",
"sqlType": "-5"
},
{
"name": "TABLE_NAME",
"type": [
"null",
"string"
],
"default": null,
"columnName": "TABLE_NAME",
"sqlType": "12"
},
{
"name": "TABLE_TYPE",
"type": [
"null",
"int"
],
"default": null,
"columnName": "TABLE_TYPE",
"sqlType": "4"
},
{
"name": "MODIFY_TIME",
"type": [
"null",
"long"
],
"default": null,
"columnName": "MODIFY_TIME",
"sqlType": "93"
},
{
"name": "TABLE_SCHEMA",
"type": [
"null",
"bytes"
],
"default": null,
"columnName": "TABLE_SCHEMA",
"sqlType": "-4"
}
],
"tableName": "dw_data_table"
}

二、spark 生成

Spark 将 mysql 等关系型数据库的数据导入到hdfs中 并 生成parquet文件时,会将date类型转换成 INT96

同样 spark也会在 parquet 的fileMetaData中记录相关类型信息,如下所示:

type 字段名 parquet type
long TABLE_ID INT64
string TABLE_NAME binary
int TABLE_TYPE int32
timestamp LAST_UPDATE_TIME Int96
binary TABLE_SCHEMA binary
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
"type": "struct",
"fields": [
{
"name": "TABLE_ID",
"type": "long",
"nullable": false,
"metadata": {
"name": "TABLE_ID",
"scale": 0
}
},
{
"name": "TABLE_NAME",
"type": "string",
"nullable": false,
"metadata": {
"name": "TABLE_NAME",
"scale": 0
}
},
{
"name": "TABLE_TYPE",
"type": "integer",
"nullable": false,
"metadata": {
"name": "TABLE_TYPE",
"scale": 0
}
},
{
"name": "LAST_UPDATE_TIME",
"type": "timestamp",
"nullable": false,
"metadata": {
"name": "LAST_UPDATE_TIME",
"scale": 0
}
},
{
"name": "TABLE_SCHEMA",
"type": "binary",
"nullable": true,
"metadata": {
"name": "TABLE_SCHEMA",
"scale": 0
}
}
]
}

三、 总结

3.1 对于 INT96

一定是timestamp类型

3.2 对于INT64

可能是

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
typeName match {
case BOOLEAN => BooleanType

case FLOAT => FloatType

case DOUBLE => DoubleType

case INT32 =>
originalType match {
case INT_8 => ByteType
case INT_16 => ShortType
case INT_32 | null => IntegerType
case DATE => DateType
case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS)
case UINT_8 => typeNotSupported()
case UINT_16 => typeNotSupported()
case UINT_32 => typeNotSupported()
case TIME_MILLIS => typeNotImplemented()
case _ => illegalType()
}

case INT64 =>
originalType match {
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS)
case UINT_64 => typeNotSupported()
case TIMESTAMP_MILLIS => typeNotImplemented()
case _ => illegalType()
}

case INT96 =>
ParquetSchemaConverter.checkConversionRequirement(
assumeInt96IsTimestamp,
"INT96 is not supported unless it's interpreted as timestamp. " +
s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
TimestampType

case BINARY =>
originalType match {
case UTF8 | ENUM | JSON => StringType
case null if assumeBinaryIsString => StringType
case null => BinaryType
case BSON => BinaryType
case DECIMAL => makeDecimalType()
case _ => illegalType()
}

case FIXED_LEN_BYTE_ARRAY =>
originalType match {
case DECIMAL => makeDecimalType(maxPrecisionForBytes(field.getTypeLength))
case INTERVAL => typeNotImplemented()
case _ => illegalType()
}

case _ => illegalType()
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public static Map<String, Long> statisticsParquet(String tablePath) throws IOException {
Map<String, Long> map = new HashMap<>(3);
Long rows = 0L;
Long columns = 0L;
Long fileSize = 0L;

FileSystem fs = null;
ParquetFileReader parquetFileReader = null;
try {
fs = FileSystem.get(HdfsUtils.getConfiguration());
Path parquetFile;
boolean isFirst = true;

RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(tablePath), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
if (fileStatus.isFile() && fileStatus.getPath().toString().toLowerCase().endsWith(".parquet")) {

parquetFile = fileStatus.getPath();
fileSize += fileStatus.getLen();

parquetFileReader = new ParquetFileReader(HdfsUtils.getConfiguration(), parquetFile,
ParquetMetadataConverter.NO_FILTER);
rows += parquetFileReader.getRecordCount();

if (isFirst) {
columns = (long) parquetFileReader.getFileMetaData().getSchema().getFieldCount();
isFirst = false;
}

parquetFileReader.close();
}
}

} finally {
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
logger.error("FileSystem close ", e);
}
}
if (parquetFileReader != null) {
try {
parquetFileReader.close();
} catch (IOException e) {
logger.error(e.getLocalizedMessage(), e);
}
}
}

map.put("rows", rows);
map.put("columns", columns);
map.put("fileSize", fileSize);
return map;
}