0%

parquet 文件读取

获取文件行数 - ParquetFileReader.readFooters

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String hdfspath = "/user/mls_zl/mysql2hdfs/parquet/time";
Configuration configuration = new Configuration(true);
configuration.set("fs.defaultFS", "hdfs://10.100.1.131:9000");
Path inputPath = new Path(hdfspath);

FileStatus inputFileStatus = inputPath.getFileSystem(configuration).getFileStatus(inputPath);
List<Footer> footers = org.apache.parquet.hadoop.ParquetFileReader.readFooters(HdfsUtils.getConfiguration(),
inputFileStatus, false);
for (Footer footer : footers) {
System.out.println("file:" + footer.getFile().toString());
long cout = 0;
for (BlockMetaData blockMetaData : footer.getParquetMetadata().getBlocks()) {
cout += blockMetaData.getRowCount();
}

System.out.println("size:" + cout);
}

递归统计 - RemoteIterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
fs = FileSystem.get(HdfsUtils.getConfiguration());
Path parquetFile;
boolean isFirst = true;

RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(tablePath), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
if (fileStatus.isFile() && fileStatus.getPath().toString().toLowerCase().endsWith(".parquet")) {
parquetFile = fileStatus.getPath();
fileSize += fileStatus.getLen();

parquetFileReader = new ParquetFileReader(HdfsUtils.getConfiguration(), parquetFile,
ParquetMetadataConverter.NO_FILTER);
rows += parquetFileReader.getRecordCount();
if (isFirst) {
columns = (long) parquetFileReader.getFileMetaData().getSchema().getFieldCount();
isFirst = false;
}
parquetFileReader.close();
}
}

读取文件信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void read1(){
ParquetMetadata readFooter = ParquetFileReader.readFooter(fs.getConf(), path, ParquetMetadataConverter.NO_FILTER);
MessageType schema = readFooter.getFileMetaData().getSchema();
List<type> columnInfos = schema.getFields();
ParquetReader<group> reader = ParquetReader.builder(new GroupReadSupport(), path).
withConf(fs.getConf()).build();
int count = 0;
Group recordData = reader.read();

while (count < 10 && recordData != null) {
int last = columnInfos.size() - 1;
StringBuilder builder = new StringBuilder();
builder.append("{\"");
for (int j = 0; j < columnInfos.size(); j++) {
if (j < columnInfos.size() - 1) {
String columnName = columnInfos.get(j).getName();
String value = recordData.getValueToString(j, 0);
builder.append(columnName + "\":\"" + value + "\",");
}
}
String columnName = columnInfos.get(last).getName();
String value = recordData.getValueToString(last, 0);

System.out.println(builder.toString());
count++;
recordData = reader.read();
}

} catch (Exception e) {
}
}