0%

parquet_classes

1、parquet文件结构相关类

ParquetMetadata

ParquetMetaData类封装了Parquet文件的元数据信息,其包含一个FileMetaData类和一个BlockMetaData List,并且提供静态方法,采用org.codehaus.jackson包将ParquetMetaData变成json格式,当然也提供函数将json格式的元数据转换成一个ParquetMetaData对象。

1
2
3
4
5
6
public class ParquetMetadata {
private static ObjectMapper objectMapper = new ObjectMapper();
private static ObjectMapper prettyObjectMapper = new ObjectMapper();
private final FileMetaData fileMetaData;
private final List<BlockMetaData> blocks;
}

FileMetaData

FileMetaData类包含文件的元数据,包含数据描述信息schema、String键值对Map<String,String>、以及文件创建版本信息。

1
2
3
4
5
6
public final class FileMetaData implements Serializable {
private static final long serialVersionUID = 1L;
private final MessageType schema;
private final Map<String, String> keyValueMetaData;
private final String createdBy;
}

keyValueMetaData中一般会存储parquet的补充信息,不同软件生成的信息不同:

  • sparksql生成的信息如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
org.apache.spark.sql.parquet.row.metadata={
"type":"struct",
"fields":[
{"name":"apply_cnt","type":"string","nullable":true,"metadata":{}},
{"name":"bbr_addressno","type":"string","nullable":true,"metadata":{}},
{"name":"bbr_age","type":"string","nullable":true,"metadata":{}},
{"name":"bbr_is_has_medica","type":"string","nullable":true,"metadata":{}},
{"name":"bbr_marriage","type":"string","nullable":true,"metadata":{}},
{"name":"bbr_sex","type":"string","nullable":true,"metadata":{}},
{"name":"bd_amnt_sum","type":"string","nullable":true,"metadata":{}},
{"name":"bd_lccont_year","type":"string","nullable":true,"metadata":{}},
{"name":"dir_work_time","type":"string","nullable":true,"metadata":{}},
{"name":"dlr_bd_chuxian","type":"string","nullable":true,"metadata":{}},
{"name":"dlr_if_have_backgroud","type":"string","nullable":true,"metadata":{}},
{"name":"dlr_kind_accresult2","type":"string","nullable":true,"metadata":{}},
{"name":"grtno","type":"string","nullable":true,"metadata":{}},
{"name":"label","type":"string","nullable":true,"metadata":{}},
{"name":"last_declinepay_amt","type":"double","nullable":true,"metadata":{}},
{"name":"last_declinepay_cnt","type":"double","nullable":true,"metadata":{}},
{"name":"last_realpay_amt","type":"double","nullable":true,"metadata":{}},
{"name":"last_realpay_cnt","type":"double","nullable":true,"metadata":{}},
{"name":"pol_num","type":"double","nullable":true,"metadata":{}},
{"name":"prem_sum","type":"double","nullable":true,"metadata":{}},
{"name":"prt_accidentdate_del","type":"double","nullable":true,"metadata":{}},
{"name":"self_pol","type":"double","nullable":true,"metadata":{}},
{"name":"tabfeemoney","type":"double","nullable":false,"metadata":{}},
{"name":"tbr_accilccont_cnt","type":"double","nullable":false,"metadata":{}},
{"name":"tbr_age","type":"double","nullable":false,"metadata":{}},
{"name":"tbr_appntsex","type":"double","nullable":false,"metadata":{}},
{"name":"tbr_lccont_cnt","type":"double","nullable":false,"metadata":{}},
{"name":"tbr_marriage","type":"double","nullable":false,"metadata":{}},
{"name":"tbr_year_prem","type":"double","nullable":false,"metadata":{}},
{"name":"bit_result","type":"string","nullable":true,"metadata":{}}
]
}}
  • sqoop生成的如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
avro.schema={
"type":"record",
"name":"dw_data_table",
"doc":"Sqoop import of dw_data_table",
"fields":[
{"name":"TABLE_ID","type":["null","long"],"default":null,"columnName":"TABLE_ID","sqlType":"-5"},
{"name":"TABLE_NAME","type":["null","string"],"default":null,"columnName":"TABLE_NAME","sqlType":"12"},
{"name":"TABLE_TYPE","type":["null","int"],"default":null,"columnName":"TABLE_TYPE","sqlType":"4"},
{"name":"DW_ID","type":["null","long"],"default":null,"columnName":"DW_ID","sqlType":"-5"},
{"name":"NODE_INSTANCE_ID","type":["null","long"],"default":null,"columnName":"NODE_INSTANCE_ID","sqlType":"-5"},
{"name":"COLUMNS","type":["null","long"],"default":null,"columnName":"COLUMNS","sqlType":"-5"},
{"name":"TABLE_ROWS","type":["null","long"],"default":null,"columnName":"TABLE_ROWS","sqlType":"-5"},
{"name":"FILE_SIZE","type":["null","long"],"default":null,"columnName":"FILE_SIZE","sqlType":"-5"},
{"name":"FILE_PATH","type":["null","string"],"default":null,"columnName":"FILE_PATH","sqlType":"12"},
{"name":"MODIFY_TIME","type":["null","long"],"default":null,"columnName":"MODIFY_TIME","sqlType":"93"},
{"name":"STORAGE_TYPE","type":["null","int"],"default":null,"columnName":"STORAGE_TYPE","sqlType":"4"},
{"name":"TABLE_DESC","type":["null","string"],"default":null,"columnName":"TABLE_DESC","sqlType":"12"},
{"name":"STATUS","type":["null","int"],"default":null,"columnName":"STATUS","sqlType":"4"},
{"name":"LAST_UPDATE_TIME","type":["null","long"],"default":null,"columnName":"LAST_UPDATE_TIME","sqlType":"93"},
{"name":"LAST_UPDATE_OPER","type":["null","string"],"default":null,"columnName":"LAST_UPDATE_OPER","sqlType":"12"},
{"name":"CREATE_TIME","type":["null","long"],"default":null,"columnName":"CREATE_TIME","sqlType":"93"},
{"name":"CREATE_OPER","type":["null","string"],"default":null,"columnName":"CREATE_OPER","sqlType":"12"},
{"name":"TABLE_SCHEMA","type":["null","bytes"],"default":null,"columnName":"TABLE_SCHEMA","sqlType":"-4"}
],
"tableName":"dw_data_table"}}

BlockMetaData

row group元数据

1
2
3
4
5
6
public class BlockMetaData {
private List<ColumnChunkMetaData> columns = new ArrayList();
private long rowCount;
private long totalByteSize;
private String path;
}

MessageType

MessageType 是 GroupType 的子类,代表Parquet描述数据字段的schema的根节点

1
2
public final class MessageType extends GroupType {
}

2、数据类型相关类

Type

抽象类Type封装了当前字段的名称、重复类型(Repetition)、以及逻辑类型(OriginalType)。

其中OriginalType中对应关系:

MAP LIST UTF8 MAP_KEY_VALUE ENUM DECIMAL
哈希映射表Map 线性表List UTF8编码的字符串 包含键值对的Map 枚举类型 十进制数
1
2
3
4
5
6
public abstract class Type {
private final String name;
private final Type.Repetition repetition;
private final OriginalType originalType;
private final Type.ID id;
}

Type 有两个子类PrimitiveType和GroupType,分别代表Parquet支持的原始数据类型和Group多个字段的组合类型。

GroupType

多个字段的组合类型

1
2
3
4
public class GroupType extends Type {
private final List<Type> fields;
private final Map<String, Integer> indexByName;
}

PrimitiveType

Parquet支持的原始数据类型

1
2
3
4
5
public final class PrimitiveType extends Type {
private final PrimitiveType.PrimitiveTypeName primitive;
private final int length;
private final DecimalMetadata decimalMeta;
}

3、Group相关

Group

抽象类Group表示包含一组字段的Parquet schema节点类型,封装了各种类型的 add方法和get方法

SimpleGroup

SimpleGroup是Group的一个子类,一个最简单形式的Group:包含一个GroupType 和字段数据。

GroupType表示Group类型。

List<Object>[] data保存该Group中的字段数据,各字段在List数组中的顺序和GroupType中定义的一致。List列表中既可以保存Primitive类型的原始数据类型,也可以保存一个Group。也就是说一个SimpleGroup类型可以表示由schema表示的一行记录。

1
2
3
4
public class SimpleGroup extends Group {
private final GroupType schema;
private final List<Object>[] data;
}

参考

列式存储 Parquet

sqoop导入到hdfs命令

指定parquet类型

1
sqoop import --connect jdbc:mysql://10.100.1.30:3306/zy-ds --username root --password root --table users --delete-target-dir --target-dir /user/mls/dw_data/2/ --as-parquetfile;

sqoop导入hive命令

增量导入

  1. 简单的增量导入,需自行确定时间分片
1
sqoop import --connect jdbc:mysql://10.100.1.30:3306/zy-ds2 --username root --password root --table calls --delete-target-dir --hive-import --hive-database zyds --hive-table calls -m 1 --where "split_date='20170101'";
  1. 使用 --incremental
    incremental 有两种模式:append 和 lastmodified。目前 hive import 暂不支持 append 模式。
    incremental 需要跟随两个参数(以下只针对 lastmodified 说明):
  • check-column 必须是 timestampdate 类型。(对于 db2 :check-column的列名必须大写
  • last-value 需要大于之前的最大值,一般取 字段最大值 +1
1
sqoop import --connect jdbc:mysql://10.100.1.30:3306/zy-ds2 --username root --password root --table calls --delete-target-dir --hive-import --hive-database zyds --hive-table calls -m 1 --incremental lastmodified --check-column crt_date --last-value "2015-11-25 12:41:40"

全量导入

其实就是将原有覆盖写入

1
sqoop import --connect jdbc:mysql://10.100.1.30:3306/zy-ds2 --username root --password root --table calls --delete-target-dir --hive-import --hive-database zyds --hive-table calls -m 1 --hive-overwrite;

取消SSL警告

警告如下:

1
Tue Aug 09 10:29:43 CST 2016 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

解决方案

产生报警的原因是因为,我搭建的Hive使用MySql作为metadata的存储,而MySql为5.7.12版本,需要在连接串中指定是否采用SSL连接。

所以我们只需修改Hive的 hive-site.xml,在连接串中加入指定SSL为false即可:

1
2
3
4
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://ut07:3306/hive?createDatabaseIfNotExist=true&amp;useUnicode=true&amp;characterEncoding=UTF-8&amp;useSSL=false</value>
</property>

实践中遇到的问题

1. Mysql 导到 hive 命令(单表导入命令):

进入/sqoop/bin目录下,执行如下语句:

1
./sqoop import --connect jdbc:mysql://10.100.1.30:3306/zy-ds2 --username root --password root --table calls --hive-import --hive-database zyds --hive-table calls -m 1;
1
2
3
4
5
6
--connect         jdbc:mysql://10.100.1.30:3306/zy-ds2为数据库url
--username root --password root 为数据库用户名密码
--table calls 导入表
--hive-database zyds 导入至hive内的数据库名
--hive-table calls 导入至hive内的表名
-m 1 表示启动几个map任务来读取数据(如果数据库中的表没有主键这个参数是必须设置的而且只能设定为1 )

db2 导入到 hive

若要使用DB2可将 db2jcc.jar 和 db2jcc_license_cu.jar加入sqoop的lib目录

1.1 FileAlreadyExistsException

17/02/17 12:27:02 ERROR tool.ImportTool: Encountered IOException running import job: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/user/hadoop/calls already exists

解决方法:

hadoop dfs -rm -r 相应目录。

或者
运行 sqoop 时添加 --delete-target-dir 参数 (放在hive-import 之前)

1.2 已经存在相应的表

如果hive数据库已经存在相应的表,可以通过添加参数 --hive-overwrite(放在hive-import 之后)

2. Mysql => hive 命令(多表导入命令):

使用sqoop-import-all-tables命令实现多表导入。

在使用多表导入之前,以下三个条件必须同时满足:
1、每个表必须都只有一个列作为主键;
2、必须将每个表中所有的数据导入,而不是部分;
3、你必须使用默认分隔列,且WHERE子句无任何强加的条件

2.1 例子

1
sqoop import-all-tables --connect jdbc:mysql://10.100.1.30:3306/zy-ds2 --username root --password root --hive-import --hive-database zyds -m 2

如需指定 hdfs 目录 可加参数 --warehouse-dir='/user/zl/'

NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务(auxiliary service)。

NodeManager

1 NodeManager内各组件功能

1.1 NodeStatusUpdater

当NM启动时,该组件向RM注册,并发送节点上可用资源。接下来,NM与RM通信,汇报各个Container的状态更新,包括节点上正运行的Container、已完成的Contaner等。
此外,RM可能向NodeStatusUpdater发信号,杀死处于运行中的Container。

注:NodeStatusUpdater是NM与RM通信的唯一通道,它实际上是RPC协议ResourceTracker的client,它周期性地调用RPC函数nodeHeartbeat()向RM汇报本节点上各种信息,包括资源使用情况,各个Container运行情况等。

1.2 ContainerManager

它是NodeManager中核心组件,它由以下几个子组件构成,每个子组件负责一部分功能,以管理运行在该节点上的所有Container。(注意,ContainerManager实际上是个接口,真正的实现是ContainerManagerImpl类)

1.2.1 RPC Server

ContainerManager从各个Application Master上接收RPC请求以启动Container或者停止正在运行的Container。它与ContainerTokenSecretManager(下面将介绍)合作,以对所有请求进行合法性验证。所有作用在正运行Container的操作均会被写入audit-log,以便让安全工具进行后续处理。
注:这里的“RPC Server”实际上是RPC协议ContainerManager的server,AM可通过该协议通知某个节点启动或者释放container,ContainerManager定义了三个接口供AM使用:

StartContainerResponse startContainer(StartContainerRequest request); //启动container
StopContainerResponse stopContainer(StopContainerRequest request); //释放container
GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request);//获取container列表。

1.2.2 ResourceLocalizationService

负责(从HDFS上)安全地下载和组织Container需要的各种文件资源。它尽量将文件分摊到各个磁盘上。它会为下载的文件添加访问控制限制,并为之施加合适的(磁盘空间)使用上限。

注:该服务会采用多线程方式同时从HDFS上下载文件,并按照文件类型(public或者private文件)存放到不同目录下,并为目录设置严格的访问权限,同时,每个用户可使用的磁盘空间大小也可以设置。

1.2.3 ContainersLauncher

维护了一个线程池,随时准备并在必要时尽快启动Container,同时,当收到来自RM或者 ApplicationMaster的清理Container请求时,会清理对应的Container进程。

1.2.4 AuxServices

NM提供了一个框架以通过配置附属服务扩展自己的功能,这允许每个节点定制一些特定框架可能需要的服务,当然,这些服务是与NM其他服务隔离开的(有自己的安全验证机制)。附属服务需要在NM启动之前配置好,且由对应应用程序的运行在本节点上的第一container触发启动。

1.2.5 ContainersMonitor

当一个Container启动之后,该组件便开始观察它在运行过程中的资源利用率。为了实现资源隔离和公平共享,RM为每个Container分配了一定量的资源。ContainersMonitor持续监控每个Container的利用率,一旦一个Container超出了它的允许使用份额,它将向Container发送信号将其杀掉,这可以避免失控的Container影响了同节点上其他正在运行的Container。(注意,ContainersMonitor实际上是个接口,真正的实现是ContainersMonitorImpl类)。

注:NM启动一个container后,ContainersMonitor会将该container进程对一个的pid添加到监控列表中,以监控以pid为根的整棵进程树的资源使用情况,它周期性地从/etc/proc中获取进程树使用的总资源,一旦发现超过了预期值,则会将其杀死。在最新版YARN中,已采用了Linux container对资源进行隔离。

1.2.6 LogHandler

一个可插拔组件,用户通过它可选择将Container日志写到本地磁盘上还是将其打包后上传到一个文件系统中。

1.3 ContainerExecutor

与底层操作系统交互,安全存放Container需要的文件和目录,进而以一种安全的方式启动和清除Container对应的进程。
注:在最新版YARN中,已采用了Linux container对资源进行隔离

1.4 NodeHealthCheckerService

提供以下功能:通过周期性地运行一个配置好的脚本检查节点的健康状况,它也会通过周期性地在磁盘上创建临时文件以监控磁盘健康状况。任何系统健康方面的改变均会通知NodeStatusUpdater(前面已经介绍过),它会进一步将信息传递给RM。

1.5 Security

(1) ApplicationACLsManager NM需要为所有面向用户的API提供安全检查,如在Web-UI上只能将container日志显示给授权用户。该组件为每个应用程序维护了一个ACL列表,一旦收到类似请求后会利用该列表对其进行验证。
(2) ContainerTokenSecretManager 检查收到的各种访问请求的合法性,确保这些请求操作已被RM授权。

1.6 WebServer

在给定时间点,展示该节点上所有应用程序和container列表,节点健康相关的信息和container产生的日志。

2 主要功能亮点

2.1 启动Container

为了能够启动Container,NM期望收到的Container定义了关于它运行时所需的详细信息,包括运行container的命令、环境变量、所需的资源列表和安全令牌等。
一旦收到container启动请求,如果YARN启用了安全机制,则NM首先验证请求合法性以对用户和正确的资源分配进行授权。之后,NM将按照以下步骤启动一个container:

  1. 在本地拷贝一份运行Container所需的所有资源(通过Distributed Cache实现)。
  2. 为container创建经隔离的工作目录,并在这些目录中准备好所有(文件)资源。
  3. 运行命令启动container

2.2 日志聚集

处理用户日志是过去令人头痛的事情之一。与MRv1不同,NM不再截取日志并将日志留单个节点(TaskTracker)上,而是将日志上传到一个文件系统中,比如HDFS,以此来解决日志管理问题。
在某个NM上,所有属于同一个应用程序的container日志经聚集后被写到(可能经过压缩处理)一个FS上的日志文件中,用户可通过YARN命令行工具,WEB-UI或者直接通过FS访问这些日志。

2.3 MapReduce shuffle如何利用NM的附属服务

运行MapReduce程序所需的shuffle功能是通过附属服务实现的,该服务会启动一个Netty Server,它知道如何处理来自Reduce Task的MR相关的shuffle请求。MR(MapReduce) AM(ApplicationMaster)为shuffle服务定义了服务ID,和可能需要的安全令牌,而NM向AM提供shuffle服务的运行端口号,并由AM传递给各个Reduce Task。

结论

在YARN中,NodeManager主要用于管理抽象的container,它只处理container相关的事情,而不必关心每个应用程序(如MapReduce Task)自身的状态管理,它也不再有类似于map slot和reduce slot的slot概念,正是由于上述各个模块间清晰的责任分离,NM可以很容易的扩展,且它的代码也更容易维护。

参考

介绍NodeManager

董的博客

##简介

在YARN中,ResourceManager负责集群中所有资源的统一管理和分配,它接收来自各个节点(NodeManager)的资源汇报信息,并把这些信息按照一定的策略分配给各个应用程序(ApplicationMasters)。

  1. NodeManager: 接收来自ResourceManager的信息,并管理单个节点上的可用资源
  2. ApplicationMasters: 负责向ResourceManager申请相关资源,并与NodeManagers一起处理containers的启动

resource_manager

1 客户端和RM的接口组件

  • ClientService: 客户端和Resource Manager的接口。这个组件控制所有从客户端到RM的RPC接口,包括application提交,中断,获取查询信息,集群统计等。
  • AdminService: 为了确保管理请求不受普通用户的请求影响,给操作者的命令更高的优先权,所有的管理操作,比如刷新节点列表,队列配置等,都通过这个独立的接口提供服务。

2 RM连接节点的组件

  • ResourceTrackerService: 这是响应所有节点RPC请求的组件。他负责注册新节点,拒绝任何无效/退役的节点请求,收集节点心跳并发送给YarnScheduler。他和下面说的NMLivelinessMonitor和NodesListManager结合紧密。
  • NMLivelinessMonitor: 跟踪活跃的节点,特别关注关闭和死掉得节点,这个组件保持跟踪每个节点的最新心跳时间。任何在设定间隔时间内没有新掉的节点被认为是死掉了,默认是10分钟,被RM认为是失效。所有当时运行在失效节点上的container被标记为死亡,不会有新的container被安排在这个节点。
  • NodesListManager: 收集有效和失效节点。通过读取配置文件yarn.resourcemanager.nodes.include-pathyarn.resourcemanager.nodes.exclude-path,来初始化节点列表。同时持续跟踪节点失效情况。

3 和每个AMs交互的组件

  • ApplicationMasterService: 这个组件响应所有AMs的RPCs请求。他负责处理注册新的AMs,中断/注销任何完成的AMs,从所有正在运行的AMs获取container分配和处理请求,并发送到YarnScheduler。他和下面说的AMLivelinessMonitor紧密结合。
  • AMLivelinessMonitor: 帮助管理活跃AMs和死亡/无响应AMs列表,这个组件持续跟踪每个AM和他的最新心跳时间。任何在配置的间隔时间内无心跳的AM被认为死亡,并从RM里失效,默认是10分钟。所有失效AM的containers也会被标记死亡。RM会安排同一个AM在一个新的container上,最大重试次数是默认是4。

4 ResourceManager的核心 – 调度器相关组件

  • ApplicationsManager: 负责维护一个已提交applications的集合。保存完成applications缓存,用来给用户的web UI或者命令行请求提供完成applications的信息。
  • ApplicationACLsManager: RM需要面对用户通过客户端或者管理者的API请求,并只允许授权的用户。这个组件维护着每个application的ACLs列表,强制接收杀应用和查看应用状态的请求。
  • ApplicationMasterLauncher: 维护一个线程池去启动最新提交applications的AMs,以及那些之前由于原因需要退出的applications的AMs。也负责当一个application正常完成或者强制中断后清理AM。
  • YarnScheduler: 这个调度者负责根据性能,队列等因素分配资源给各种运行的applications。他基于applications资源请求来执行调度,比如内存,CPU,磁盘,网络等。当前只支持内存,CPU支持很快会完成。
  • ContainerAllocationExpirer: 这个组件确保所有已经分配的containers正在被AMs使用,和之后要被启动的container对应的NMs。AMs上运行的是不被信赖的用户代码,分配的资源有可能不被使用,就造成了集群利用率的降低。为了解决这个问题,ContainerAllocationExpirer维护着一个已分配但是没有在相应NM上使用的containers列表。对于任意container,如果对应的NM在配置的时间间隔(默认10分钟)内没有报告给RM这个container已经启动,这个container就被RM认为是死亡了或者失效了。

5 TokenSecretManagers (for security)

ResourceManager 有几个 SecretManagers负责管理 tokens, secret-keys,用于认证/授权各种RPC接口的请求。一些YARN上的覆盖tokens,secret-keys和secret-managers的细节概要如下:

  • ApplicationTokenSecretManager: 为了避免从RM发送过来的任意进程的请求,RM使用每个应用的tokens,ApplicationTokens。这个组件保存token在本地内存里直到applications完成,使用他去认证从一个有效AM进程来的任何请求。
  • ContainerTokenSecretManager: ContainerTokens的SecretManager是被RM标记的特殊tokens用于给AM一个在指定节点上的container。ContainerTokens被AMs用于创建一个和相应的分配container的NM的连接。这个组件是RM特定的,跟踪相关的主机和secret-keys,并定时更新keys。
  • RMDelegationTokenSecretManager: 一个ResourceManager有一个特定的 delegation-token secret-manager. 他负责生成委托token给客户端,客户端可以传给未认证的希望和RM交互的进程。

6 DelegationTokenRenewer

在安全模式,RM是Kerberos认证的,提供更新文件系统的tokens服务给applications代表。这个组件更新已经提交的applications的tokens,只要这个applications在运行中,直到这个tokens不再被更新。

结束

在YARN里, ResourceManage主要被限制于调度用途,即只可以做系统中可用资源的竞争applications间的仲裁,并不关心每个application的状态管理。由于这种如之前所说的模块化的清晰的责任划分,使用前面说的强大的调度器API,RM能够满足最重要的设计要求 – 可扩展性,支持多种编程模型。

要允许不同的策略实现,RM的调度器是可插拔的,允许不同的策略算法。在未来很长时间,我们会深入挖掘各种特性的性能调度器,确保基于性能保证和队列安排containers。

参考网址

介绍resourcemanager

如前面所描述的, YARN 实质上是管理分布式app的系统。他由一个全局的ResourceManager和 每一个节点上的NodeManager组成。ResourceManager用于管理集群所有的可用资源,而每一个节点上的 NodeManager用于与ResourceManager沟通并负责管理单节点的可用资源。

yarnflow1

1、Resource Manager

在YARN里, ResourceManager 是一个主要的纯粹的scheduler。从本质上讲,它严格限制系统中可用资源的竞争。它优化了集群利用率(保持所有资源始终处于可用状态),以抵御各种限制,如容量保证,公平性和slas。为了允许不同的策略约束,资源管理器具有可插入的调度器,允许根据需要使用不同的算法,例如容量和公平调度。

2、ApplicationMaster

许多人会将yarn与现有的hadoop mapreduce系统(apache hadoop 1.x中的MR1)相提并论。然而,他们之间关键的区别:ApplicationMaster概念的加入。

ApplicationMaster 实际上是一个特定框架库的一个实例,负责与ResourceManager协商资源,并与NodeManager(s)一起执行和监视这些容器及其资源消耗。AM的功能就是向ResourceManager申请适当的资源容器,跟踪他们的状态和监测进度。

ApplicationMaster 使 YARN 具有以下几个特性:

  • 可扩展:ApplicationMaster提供给传统的ResourceManager很多功能,使整个系统拥有了更好的可扩展性。在测试中,我们已经成功模拟了10000个由现代硬件组成的节点集群,没有出现重大问题。这是我们选择将ResourceManager设计成纯调度器的关键原因之一,它并不试图为资源提供容错。我们将其转移到了ApplicationMaster实例的主要职责上。此外,由于每个应用程序都有一个ApplicationMaster的实例,所以ApplicationMaster本身并不是集群中常见的瓶颈。
  • 开放:将所有应用程序框架特定的代码移动到ApplicationMaster中,以使我们现在可以支持多种框架,比如MapReduce、mpi和图形处理。

下面有一些YARN设计的关键点:

  • 把所有的复杂性(尽可能的)交给ApplicationMaster,同时提供足够的功能以允许applicaiton-framework的开发者有足够的灵活性和权利。
  • 既然他实际上是用户端代码,所以不必信任ApplicationMasters,即任何ApplicationMaster都不是一个特权服务。
  • YARN 系统 (ResourceManager 和 NodeManager) 能够保护他们自己免受错误的或者恶意的ApplicationMasters的影响。

记住这点是重要的,每个application有它自己的ApplicationMaster实例。然而,ApplicationMaster管理一组applications也是完全可行的(比如Pig或者Hive的ApplicationMaster就管理一系列的MapReduce作业)。此外,这个概念已经被延伸到管理长期运行的服务,这些长期运行的服务管理着他们自己的应用(比如通过一个虚拟的HBaseAppMaster启动HBase)。

3、Resource Model

YARN 给applications提供了一个非常通用的Resource Model。一个application(通过ApplicationMaster)可以请求的资源包括如下:

  • Resource-name (hostname, rackname – 我们正在进一步将其用于支持更复杂的网络拓扑结构).
  • Memory (in MB)
  • CPU (cores, for now)
  • 将来, 我们会添加更多的资源类型比如磁盘/网络I/O,GPU等。

4、ResourceRequest and Container

YARN的设计允许个别应用程序(通过应用程序管理器)以共享,安全和多租户的方式利用集群资源。此外,为了有效地调度和优化数据访问,它使用集群拓扑结构以尽可能减少应用程序的数据移动。

为了实现这些目标,ResourceManager的Scheduler获取了广泛的application的资源需求信息,这样使他能够给集群里所有的applications做出更好的调度决定。这就带给我们了 ResourceRequestContainer.

4.1 ResourceRequest

一个application可以通过ApplicationMaster请求到足够的资源来满足application的资源需求。调度程序通过授予容器来响应资源请求,该容器满足初始资源请求中由应用程序管理器规定的要求。

看一下 ResourceRequest – 有如下形式:

<resource-name, priority, resource-requirement, number-of-containers>

####4.1.1 ResourceRequest的每个组件

  • resource-name: hostname, rackname 或者 * 表示没有特别要求。在未来,我们希望能够支持更复杂的虚拟机拓扑结构,更复杂的网络等等。
  • priority: 这个请求的优先级是应用内优先级(强调,这不是跨多个应用程序)。
  • resource-requirement: 如内存,CPU等(编写时只支持内存和CPU)。。
  • number-of-containers: 容器个数

4.2 Container

本质上,Container 是资源分配,这是ResourceManager 授予特定ResourceRequest的成功结果。Container为application授予在特定主机上使用特定数量的资源(内存,CPU等)的权限。

ApplicationMaster必须将Container提交给对应的NodeManager,以便使用资源启动其任务。当然,在安全模式下验证容器分配是为了确保 ApplicationMaster(s) 不能在集群中伪造分配。

4.2.1 容器运行期间的容器规格

如上所述,容器仅仅拥有在集群中的特定计算机上使用指定数量资源的权利,ApplicationMaster必须向NodeManager提供相当多的信息才能真正启动容器。

yarn允许应用程序启动任何进程,而不像hadoop-1.x(aka MR1)中现有的hadoop mapreduce,它不仅仅局限于java应用程序。

yarn容器的启动规范API是平台不可知的,包含:

  • 命令行来启动容器内的进程。
  • 环境变量。
  • 机器启动前必需的本地资源,如罐子,共享对象,辅助数据文件等
  • 与安全有关的令牌。

这允许ApplicationMaster使用NodeManager来启动容器,从unix / windows上的简单shell脚本到c / java / python进程到完整的虚拟机(例如kvms)。

5、YARN - 应用

掌握了上述概念的知识后,概略地说明applications在YARN上的工作原理是有用的。

applications执行包括以下步骤:

  • 申请提交。
  • 引导applications的ApplicationMaster实例。
  • applications执行由ApplicationMaster实例管理。

让我们来看一下应用程序的执行顺序(步骤如图所示):

  1. 客户端程序提交applications,包括启动特定于applications的ApplicationMaster本身的必要规范。
  2. Resource Manager承担协商指定容器的责任,在该容器中启动ApplicationMaster,然后启动ApplicationMaster。
  3. 在启动时,ApplicationMaster向ResourceManager注册 - 注册允许客户端程序查询资源管理器的细节,这使得它可以直接与自己的ApplicationMaster进行通信。
  4. 在正常操作期间,ApplicationMaster通过资源请求协议来协商合适的资源容器。
  5. 在成功分配容器时,ApplicationMaster通过向NodeManager提供容器启动规范来启动容器。启动规范通常包含允许容器与应用程序管理器本身通信的必要信息。
  6. 在容器内执行的应用程序代码然后通过应用程序特定的协议向其应用程序主管提供必要的信息(进度,状态等)。
  7. 在应用程序执行期间,提交程序的客户端通过应用程序特定的协议直接与应用程序管理器通信以获取状态,进度更新等。
  8. 一旦申请完成,并且所有必要的工作已经完成,申请管理员就会注销资源管理器并关闭,从而允许自己的容器被重新利用。

img

参考

APACHE HADOOP YARN – CONCEPTS AND APPLICATIONS

一、Apache Hadoop MapReduce

Apache Hadoop MapReduce是一个Google MapReduce编程模型的开源版本,由Apache基金会维护。现在,已经有人花了超过6年的时间在Hadoop上。但是,基本上MapReduce基本上可以分为三个主要部分:

  • MapReduce API:提供给终端用户(程序猿)开发MR程序的接口;
  • MapReduce 框架:MR各个过程(phrase)的实现,如:map phrase、reduce phrase、sort/shuffle/merge phrase等;
  • MapReduce 系统:运行用户MR程序的后端基础设施,用以管理资源、调度任务等。

将MR分成以上三个概念非常的重要,特别是对终端用户,他们可以完全专注于MR逻辑代码的编写,只需要通过API既可,由MR系统来解决资源管理、容错、调度的问题,而不需要用户考虑后端框架和系统的细节。

现在工业界大部分还是用的0.23之前的版本(至少我待的公司还是0.20.2),老版本的MapReduce系统是简易的Master-Slaves结构,具体名字叫JobTracker-TaskTracker。

JobTracker负责资源的管理(结点资源、计算资源等)以及任务生命周期管理(任务调度、进度查看、容错等)。而TaskTracker职责非常简单,开启/销毁任务,向JobTracker汇报任务状态。

旧版的架构其实挺清晰的,不过也有很多不足的地方,业界一直嚷着要给MR一次大整修(Overhaul),JobTracker的可靠性是一直被诟病的一点(虽然我没见它挂过,但是风险一直存在着),但是除了JobTracker的单点问题,其它的问题也需要一一列出来。

1.1 不支持其它编程模型

MapReduce对大多数应用(尤其是大数据统计分析)来说,都非常合适。但是有的时候,可能现实生活也有其它的编程模型,如图算法(Google Pregel/Apache Giapah)或者是迭代式模型(MPI)。当企业的所有数据在放在了HDFS上,有多种处理数据的方式就很重要了。

而且,MR本质上是面向批处理的,并不支持实时或接近实时的处理请求,但是业界也希望Hadoop能支持实时计算。(我也一直希望可以支持实时计算,但是有时候觉得有点贪心,专注做一项不就好了么?但是好像人的贪欲是无穷的)

有了以上的需求,为了降低了管理者使用成本,减少数据在HDFS和其它存储设备的迁移,Hadoop开发组织重新投入了Hadoop设计。

1.2 低可扩展性

摩尔定律一直在生效,也让商用服务器的性能一直提高,以下就是一台商用服务器在不同时间的配置:

  • 2009 - 8 cores, 16GB of RAM, 4*1TB disk
  • 2012 - 16+ cores, 48-96GB of RAM, 122TB or 123TB of disk

按照上面的配置,大约2-3年,服务器的配置就可以翻翻。而现在的Hadoop集群就只能支持10,000个节点和200,000个核。Hadoop软件需要赶上硬件的速度是非常重要的。顺带说句,我们公司的计算型服务器就是16cores 64GB of RAM。

1.3 服务器的低利用率

在现在的系统中,JobTracker将管理集群视为很多的Map/Reduce槽(slot),然而在MR用运行的时候,大多数时候都是reduce槽在等待map槽完成(map 100% reduce 0%)。如果能优化这个的话,服务器就可以得到最大的利用。

1.4 使用的灵活性

在现实生产环境中,Hadoop常常被部署成一个共享的、多用户的系统。这样就会导致一种情况,完全Hadoop软件可能会影响到整个部门。用户希望能够控制hadoop软件栈升级,因此,允许多版本的MapReduce框架并存对Hadoop来说就是很重要的了。

二、Apache Hadoop YARN

YARN的基本思想是将JobTracker的两个主要职责给解耦:资源管理和任务管理(监控/调度),YARN将其分成了两个部分:全局的ResourceManager(RM)和给每个应用分配的ApplicationMaster(AM)

2.1 ResourceManager

ResourceManager和它每个节点的slave——NodeManager(NM),形成了一个全新的、用以管理应用的分布式系统。

RM是系统资源的终极管理者,而AM则是一个特定应用框架的实体(每次提交任务的时候,需要编写相应的应用框架,现在只支持MapReduce),需要与RM索要应用资源,和NM一起执行和监控任务。

RM中有调度器,而调度器内嵌有策略可插拔的插件,主要负责将集群中得资源分配给多个队列和应用。当前MapReduce的调度器,如Capacity Scheduler和Fair Scheduler,均可作为该插件。但是调度器的职责仅限于调度任务,并不保证任务的容错性。

2.2 NodeManager

NodeManager有点类似于TaskTracker,它负责启动应用程序Container(类似于JVM),并监控container的资源(CPU、内存、磁盘、网络等),并将信息上报给ResouceManager。

ApplicationMaster负责向调度器请求合适的container,并监控container的状态以及任务进程。从系统的角度来看,ApplicationMaster本身也是一个普通的container。

2.3 YARN的架构图:

image002

新YARN系统比较重要的一条就是复用了原有的MapReduce框架,而并不需要大的改动,这对现有的MR应用以及用户来说,是非常重要的,具体是怎么复用的,以后再细说。

接下来,Hadoop开发者会深入架构细节,继续提高系统的可扩展性,并让其支持更多的数据处理框架(graph, MPI)并提高集群可用性。

以Hortonworks’ Arun Murthy(YARN开发者)的一段话做结尾吧:

“People are not going to be comfortable buying a $5 million Hadoop cluster just to do MapReduce and a $2 million cluster to do something else. If you can allow them to run both apps in the same cluster, its not only easier for you in terms of a CapEx perspective … it’s also easier from an operational perspective because you don’t have to have two separate sets of people managing your clusters or two sets of tools for managing your clusters.”

hadoop fs -getMerge 源码位于 hadoop-common 包的 org.apache.hadoop.fs.shell.CopyCommands类中,以下文章以2.7.3版本为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class CopyCommands {  
public static void registerCommands(CommandFactory factory) {
factory.addClass(Merge.class, "-getmerge");
factory.addClass(Cp.class, "-cp");
factory.addClass(CopyFromLocal.class, "-copyFromLocal");
factory.addClass(CopyToLocal.class, "-copyToLocal");
factory.addClass(Get.class, "-get");
factory.addClass(Put.class, "-put");
factory.addClass(AppendToFile.class, "-appendToFile");
}
/** merge multiple files together */
public static class Merge extends FsCommand {
public static final String NAME = "getmerge";
public static final String USAGE = "[-nl] <src> <localdst>";
public static final String DESCRIPTION =
"Get all the files in the directories that " +
"match the source file pattern and merge and sort them to only " +
"one file on local fs. <src> is kept.\n" +
"-nl: Add a newline character at the end of each file.";

protected PathData dst = null;
protected String delimiter = null;
protected List<PathData> srcs = null;

@Override
/** 准备输入输出变量 */
protected void processOptions(LinkedList<String> args) throws IOException {
try {
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "nl");
cf.parse(args);

delimiter = cf.getOpt("nl") ? "\n" : null;

dst = new PathData(new URI(args.removeLast()), getConf());
if (dst.exists && dst.stat.isDirectory()) {
throw new PathIsDirectoryException(dst.toString());
}
srcs = new LinkedList<PathData>();
} catch (URISyntaxException e) {
throw new IOException("unexpected URISyntaxException", e);
}
}

@Override
/** 依次将Input文件流写入outPut文件流*/
protected void processArguments(LinkedList<PathData> items)
throws IOException {
super.processArguments(items);
if (exitCode != 0) { // check for error collecting paths
return;
}
FSDataOutputStream out = dst.fs.create(dst.path);
try {
for (PathData src : srcs) {
FSDataInputStream in = src.fs.open(src.path);
try {
IOUtils.copyBytes(in, out, getConf(), false);
if (delimiter != null) {
out.write(delimiter.getBytes("UTF-8"));
}
} finally {
in.close();
}
}
} finally {
out.close();
}
}

@Override
protected void processNonexistentPath(PathData item) throws IOException {
exitCode = 1; // flag that a path is bad
super.processNonexistentPath(item);
}

// this command is handled a bit differently than others. the paths
// are batched up instead of actually being processed. this avoids
// unnecessarily streaming into the merge file and then encountering
// a path error that should abort the merge

@Override
protected void processPath(PathData src) throws IOException {
// for directories, recurse one level to get its files, else skip it
if (src.stat.isDirectory()) {
if (getDepth() == 0) {
recursePath(src);
} // skip subdirs
} else {
srcs.add(src);
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package test;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;

public class GetLogTask {

private static Logger logger = LoggerFactory.getLogger(GetLogTask.class);

public String getLog(ApplicationId appId, String appOwner, Configuration conf) throws Exception {
checkLogAggregationEnable(conf);
int resultCode = verifyApplicationState(appId, conf);
if (resultCode != 0) {
throw new Exception("Application has not completed." +
" Logs are only available after an application completes");
}

if (appOwner == null || appOwner.isEmpty()) {
appOwner = UserGroupInformation.getCurrentUser().getShortUserName();
}

return dumpAllContainersLogs(appId, appOwner, conf);
}

private String dumpAllContainersLogs(ApplicationId appId, String appOwner, Configuration conf) throws
Exception {

StringBuilder output = new StringBuilder();
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);

Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, appOwner, logDirSuffix);
RemoteIterator<FileStatus> nodeFiles = null;
try {
Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
} catch (FileNotFoundException fnf) {
logDirNotExist(remoteAppLogDir.toString());
}
boolean foundAnyLogs = false;
assert nodeFiles != null;

//遍历 所有节点
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (!thisNodeFile.getPath().getName().endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
try {
DataInputStream valueStream;
AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
valueStream = reader.next(key);

while (valueStream != null) {

String containerString = "\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
output.append(containerString).append("\n");
output.append(StringUtils.repeat("=", containerString.length())).append("\n");
while (true) {
try {
readContainerLogs(valueStream, output, thisNodeFile.getModificationTime());
foundAnyLogs = true;
} catch (EOFException eof) {
break;
}
}

// Next container
key = new AggregatedLogFormat.LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
if (!foundAnyLogs) {
emptyLogDir(remoteAppLogDir.toString());
}

return output.toString();
}

private int verifyApplicationState(ApplicationId appId, Configuration conf) throws IOException,
YarnException {
YarnClient yarnClient = null;
try {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
switch (appReport.getYarnApplicationState()) {
case NEW:
case NEW_SAVING:
case ACCEPTED:
case SUBMITTED:
case RUNNING:
return -1;
case FAILED:
case FINISHED:
case KILLED:
default:
break;
}
} finally {
if (yarnClient != null) {
yarnClient.close();
}
}
return 0;
}

private void logDirNotExist(String remoteAppLogDir) throws Exception {
throw new FileNotFoundException(remoteAppLogDir + " does not exist.\n" + "Log aggregation has not completed " +
"or is not " + "enabled.");
}

private void emptyLogDir(String remoteAppLogDir) throws Exception {
logger.warn(remoteAppLogDir + " does not have any log files.");
}


private void readContainerLogs(DataInputStream valueStream,
StringBuilder out, long logUploadedTime) throws IOException {
byte[] buf = new byte[65535];

String fileType = valueStream.readUTF();
String fileLengthStr = valueStream.readUTF();
long fileLength = Long.parseLong(fileLengthStr);

out.append("ContainerLog-start ").append(StringUtils.repeat("-", 80)).append("\n");

out.append("LogType:").append(fileType).append("\n");
if (logUploadedTime != -1) {
out.append("Log Upload Time:").append(Times.format(logUploadedTime)).append("\n");
}
out.append("LogLength:");
out.append(fileLengthStr).append("\n\n");
out.append("Log Contents:").append("\n");

long curRead = 0;
long pendingRead = fileLength - curRead;
int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
int len = valueStream.read(buf, 0, toRead);
while (len != -1 && curRead < fileLength) {
out.append(new String(buf, 0, len));
curRead += len;

pendingRead = fileLength - curRead;
toRead =
pendingRead > buf.length ? buf.length : (int) pendingRead;
len = valueStream.read(buf, 0, toRead);
}
out.append("End of LogType:").append(fileType).append("\n");
out.append("").append("\n");
}


private void checkLogAggregationEnable(Configuration conf) throws Exception {
String isEnable = conf.get(YarnConfiguration.LOG_AGGREGATION_ENABLED);
if (!"true".equalsIgnoreCase(isEnable)) {
throw new Exception("LOG_AGGREGATION_ENABLED must be ENABLED");
}
}
}

一. Hadoop 日志存放路径详解

Hadoop的日志大致可以分为两大类,且这两类的日志存放的路径是不一样的。本文基于Hadoop 2.x 版本进行说明的。

  1. Hadoop 系统服务输出的日志
  2. Mapreduce 程序输出来的日志 ( 作业运行日志 、任务运行日志 (Container 日志))

在 Hadoop 2.0 中,Mapreduce 程序的日志包含两部分,作业运行日志任务运行日志(Container 日志)

1.1 Hadoop系统服务输出的日志

诸如 NameNode、DataNode、ResourceManage 等系统自带的服务输出来的日志默认是存放在 ${HADOOP_HOME}/logs目录下。比如 resourcemanager 的输出日志为 yarn-${USER}-resourcemanager-${hostname}.log

  • yarn 指的就是该日志的属性即为 YARN,其他类似的有 mapred、hadoop 等
  • ${USER}s 是指启动 resourcemanager 进程的用户
  • resourcemanager 就是指明 resourcemanager 进程,其他类似的有 namenode、zkfc、historyserver 等
  • ${hostname} 是 resourcemanager 进程所在机器的 hostname

当日志到达一定的大小(可以在 ${HADOOP_HOME}/etc/hadoop/log4j.properties 文件中配置)将会被切割出一个新的文件,切割出来的日志文件名类似 yarn-${USER}-resourcemanager-${hostname}.log.数字 的形式,后面的数字越大,代表日志越旧。在默认情况下,只保存前 20 个日志文件,比如下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[hadoop@master logs]$ ll
总用量 7360356
-rw-rw-r--. 1 hadoop hadoop 6251772 10月 16 13:59 hadoop-hadoop-datanode-devhost21.log
-rw-rw-r--. 1 hadoop hadoop 722 10月 16 10:52 hadoop-hadoop-datanode-devhost21.out
-rw-rw-r--. 1 hadoop hadoop 722 9月 17 15:09 hadoop-hadoop-datanode-devhost21.out.1
-rw-rw-r--. 1 hadoop hadoop 722 9月 15 23:10 hadoop-hadoop-datanode-devhost21.out.2
-rw-rw-r--. 1 hadoop hadoop 722 8月 17 20:34 hadoop-hadoop-datanode-devhost21.out.3
-rw-rw-r--. 1 hadoop hadoop 151078937 2月 8 19:16 hadoop-hadoop-datanode-master.log
-rw-rw-r--. 1 hadoop hadoop 268479664 12月 8 10:00 hadoop-hadoop-datanode-master.log.1
-rw-rw-r--. 1 hadoop hadoop 268471403 11月 14 11:34 hadoop-hadoop-datanode-master.log.2
-rw-rw-r--. 1 hadoop hadoop 268439864 11月 8 08:30 hadoop-hadoop-datanode-master.log.3
-rw-rw-r--. 1 hadoop hadoop 268435710 8月 17 19:00 hadoop-hadoop-datanode-master.log.4
-rw-rw-r--. 1 hadoop hadoop 268445084 8月 16 21:33 hadoop-hadoop-datanode-master.log.5
-rw-rw-r--. 1 hadoop hadoop 722 1月 4 14:09 hadoop-hadoop-datanode-master.out
-rw-rw-r--. 1 hadoop hadoop 722 12月 20 10:49 hadoop-hadoop-datanode-master.out.1
-rw-rw-r--. 1 hadoop hadoop 722 12月 18 16:13 hadoop-hadoop-datanode-master.out.2
-rw-rw-r--. 1 hadoop hadoop 722 10月 30 15:20 hadoop-hadoop-datanode-master.out.3
-rw-rw-r--. 1 hadoop hadoop 722 10月 30 15:17 hadoop-hadoop-datanode-master.out.4
-rw-rw-r--. 1 hadoop hadoop 722 10月 30 13:14 hadoop-hadoop-datanode-master.out.5

1.2 配置 Hadoop 系统服务日志

1. 配置 log4j 日志的属性参数

比如 resourcemanager(在 ${HADOOP_HOME}/etc/hadoop/log4j.properties):

1
2
3
4
5
6
7
8
9
10
11
log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager
$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger}
log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager
.RMAppManager$ApplicationSummary=false
log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender
log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/
${yarn.server.resourcemanager.appsummary.log.file}
log4j.appender.RMSUMMARY.MaxFileSize=256MB(多大切割日志)
log4j.appender.RMSUMMARY.MaxBackupIndex=20(说明保存最近20个日志文件)
log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n

2. 配置 resourcemanager 日志存放路径

${HADOOP_HOME}/etc/hadoop/yarn-env.sh 文件中

1
2
3
4
# default log directory & file
if [ "$YARN_LOG_DIR" = "" ]; then
YARN_LOG_DIR="$HADOOP_YARN_HOME/logs"
fi

只需要修改 YARN_LOG_DIR 的值,这时候,yarn 相关的日志记录都将存放在你配置的目录下。

二. 历史服务器 (JobHistory Server)

MapReduce 的 JobHistory Server,这是一个独立的服务,可通过 web UI 展示历史作业日志,之所以将其独立出来,是为了减轻 ResourceManager 负担。JobHistory Server 将会分析作业运行日志,并展示作业的启动时间、结束时间、各个任务的运行时间,各种Counter数据等,并产生一个指向作业和任务日志的链接,其默认端口号为 19888。通常可以启动在一台独立的机器上

2.1 历史服务器配置

你需在 mapred-site.xml 中对其进行配置

1
2
3
4
5
6
7
8
9
<property>
<name>mapreduce.jobhistory.address</name>
<value>0.0.0.0:10020</value>
</property>

<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>0.0.0.0:19888</value>
</property>

上面的参数是在 mapred-site.xml 文件中进行配置,mapreduce.jobhistory.address 和 mapreduce.jobhistory.webapp.address 默认的值分别是 0.0.0.0:10020 和 0.0.0.0:19888,大家可以一定要根据自己的情况进行相应的配置,最好别用默认的 0.0.0.0 ,参数的格式是 host:port。

在 Hadoop 历史服务器的 WEB UI 上最多显示 20000 个历史的作业记录信息;其实我们可以在 mapred-site.xml 文件中通过下面的参数进行配置,然后重启一下 Hadoop jobhistory 即可。

1
2
3
4
<property>
<name>mapreduce.jobhistory.joblist.cache.size</name>
<value>20000</value>
</property>

2.2 关于 HA 模式下的历史服务器配置的结论

笔者的集群是 HA 模式的( HDFS 和 ResourceManager HA)。在 ” Hadoop-2.5.0-cdh5.3.2 HA 安装" 中详细讲解了关于 HA 模式的搭建,这里就不再赘述。但网上直接将关于 HA 模式下的历史服务器的配置资料却很少。

笔者在思考,如果配置在 mapred-site.xml 中就设置一台历史服务器,那么当这台机器挂了,那么能不能有另一台机器来承担历史服务器的责任,也就是笔者理想当然的 jobhistory server HA 模式。后面经过各自尝试,得出来的结论是笔者我太年轻了,概念没有搞懂,先总结如下:

  • 历史服务器是个独立的服务,其不会受到 namenode 和 resourcemanager 的 active/standby 切换所带来的影响
  • 当历史服务器突然失效了,那些日志文件依旧存在 HDFS 上。当历史服务器又恢复正常,还是能看到在历史服务器失效期间的运行日志
  • 可以很简单地把历史服务器当成是存在 HDFS 上日志文件的 Web 浏览器。当且仅当历史服务器启动后,才可以通过 Web 查看,比如 http://10.6.3.43:19888/jobhistory
  • 实际上,每台机器的 MapReduce 历史服务器的配置可以不同,当在哪台机器上执行程序时,那么所指向的历史服务器地址其实就是 mapred-site.xml 文件中 mapreduce.jobhistory.webapp.address 配置参数所指定的那台机器

所以 Hadoop HA 模式下的历史服务器配置和非 HA 模式是一样样的,如果你自作聪明(比如笔者),在 mapred-site.xml 文件中,添加了两个运行 namenode(resourcemanager) 进程的主备节点的主机名(或IP地址)。

但是真正在两台主机上同时启动历史服务器进程时,会报如下的类似错误:

INFO org.apache.hadoop.http.HttpServer2: HttpServer.start() threw a non Bind IOException
77504 java.net.BindException: Port in use: master52:19888
Caused by: java.net.BindException: Cannot assign requested address
INFO org.apache.hadoop.service.AbstractService: Service HistoryClientService failed in state STARTED; cause: org.apache.hadoop.yarn.webapp.WebAppException: Error starting http server
INFO org.apache.hadoop.util.ExitUtil: Exiting with status -1

原因就是端口被占用了,很明显如果不改变端口,有且仅有一个 历史服务器成功启动,且启动的那个服务器是在 mapred-site.xml 文件中设置位置最下面的那个,及后面的配置参数将覆盖前一个配置参数。就算改变端口也没卵用…

Note:以上这些是笔者一边操作,一边对比总结,有些结论未必是正确的,还请各位指正…

2.3 启动历史服务器

配置完上述的参数之后,重新启动 Hadoop jobhistory,这样我们就可以在 mapreduce.jobhistory.webapp.address 参数配置的主机上对 Hadoop 历史作业情况经行查看。

只能在 mapred-site.xml 文件中 mapreduce.jobhistory.webapp.address 配置参数所指定的那台机器上执行:

1
sbin/mr-jobhistory-daemon.sh start jobhistoryserver

这样我们就可以在相应机器的 19888 端口上打开历史服务器的 WEB UI 界面。可以查看已经运行完的作业情况。且在 HDFS 上可以看到如下目录:

1
2
3
4
5
6
/tmp
/tmp/hadoop-yarn
/tmp/hadoop-yarn/staging
/tmp/hadoop-yarn/staging/history
/tmp/hadoop-yarn/staging/history/done
/tmp/hadoop-yarn/staging/history/done_intermediate

三. 作业运行日志

3.1 作业运行日志概念

作业运行由 MRAppMaster(MapReduce 作业的 ApplicationMaster)产生,详细记录了作业启动时间、运行时间,每个任务启动时间、运行时间、Counter 值等信息,与 Hadoop 1.0 中的 JobHistory 日志是基本一致。MapReduce 作业的 ApplicationMaster 也运行在 Container 中,且是编号为 000001 的 Container,比如 container_1385051297072_0001_01_000001,它自身可认为是一个特殊的 task,因此,也有自己的运行日志,该日志与 Map Task 和 Reduce Task 类似,但并不是前面介绍的”作业运行日志”。

ApplicationMaster 产生的作业运行日志举例如下,日志采用 apache avro(作为日志存储格式是 Hadoop 2.0 唯一使用到 Avro 的地方)工具,以 json 的格式保存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
"type":"JOB_SUBMITTED",
"event":
{"org.apache.hadoop.mapreduce.jobhistory.JobSubmitted":
{"jobid":"job_1385051297072_0002″,
"jobName":"QuasiMonteCarlo",
"userName":"yarn",
"submitTime":1385393834983,
"jobConfPath":"hdfs://hadoop-test/tmp/hadoop-yarn/staging/yarn/.staging/job_1385051297072_0002/job.xml",
"acls":{},
"jobQueueName":"default",
"workflowId":"",
"workflowName":"",
"workflowNodeName":"",
"workflowAdjacencies":"",
"workflowTags":""
}
}
}
{
"type":"JOB_INITED",
"event":
{"org.apache.hadoop.mapreduce.jobhistory.JobInited":
{"jobid":"job_1385051297072_0002″,
"launchTime":1385393974505,
"totalMaps":8,
"totalReduces":1,
"jobStatus":"INITED",
"uberized":false}
}
}
{
"type":"JOB_INFO_CHANGED",
"event":
{"org.apache.hadoop.mapreduce.jobhistory.JobInfoChange":
{"jobid":"job_1385051297072_0002″,
"submitTime":1385393834983,
"launchTime":1385393974505
}
}
}

3.2 作业运行日志配置

历史作业的记录里面包含了一个作业用了多少个 Map、用了多少个 Reduce、作业提交时间、作业启动时间、作业完成时间等信息;这些信息对分析作业是很有帮助的,我们可以通过这些历史作业记录得到每天有多少个作业运行成功、有多少个作业运行失败、每个队列作业运行了多少个作业等很有用的信息。这些历史作业的信息是通过下面的信息配置的:

在 mapred-site.xml 文件中进行配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>${yarn.app.mapreduce.am.staging-dir}/history/done</value>
</property>

<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate</value>
</property>

<property>
<name>yarn.app.mapreduce.am.staging-dir</name>
<value>/tmp/hadoop-yarn/staging</value>
</property>

3.3 作业运行日志产生过程

1. 启动作业的 ApplicationMaster 并写日志至 HDFS

  • ResourceManager 启动作业的 ApplicationMaster
  • ApplicationMaster 运行过程中,将日志写到 ${yarn.app.mapreduce.am.staging-dir}/yarn/.staging/job_XXXXX_XXX/
  • 参数 yarn.app.mapreduce.am.staging-dir 的默认值是 /tmp/hadoop-yarn/staging
  • 该目录下将存在3个文件,分别是以 “.jhist“、”.summary” 和 “.xml” 结尾的文件,分别表示作业运行日志、作业概要信息和作业配置属性,其中,作业概要信息只有一句话,举例如下:
1
2
3
4
5
jobId=job_1385051297072_0002,submitTime=1385393834983,launchTime=1385393974505,
firstMapTaskLaunchTime=1385393976706,firstReduceTaskLaunchTime=1385393982581,
finishTime=1385393985417,resourcesPerMap=1024,resourcesPerReduce=1024,
numMaps=8,numReduces=1,user=yarn,queue=default,status=SUCCEEDED,
mapSlotSeconds=47,reduceSlotSeconds=5,jobName=QuasiMonteCarlo

2. HDFS 内转移历史运行日志

  • 所有任务运行完成后,意味着,该作业运行完成
  • 此时 ApplicationMaster 将三个文件拷贝到 ${ mapreduce.jobhistory.intermediate-done-dir}/${username} 目录下,拷贝后的文件名后面添加 "_tmp"
  • 其中 mapreduce.jobhistory.intermediate-done-dir 默认值是 ${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate
  • ApplicationMaster 将拷贝完成的三个文件重新命名成 “.jhist”、”.summary” 和 “.xml” 结尾的文件(去掉 "_tmp"

3. 周期转移 done_intermediate 中的日志文件到 done 目录

  • 周期性扫描线程定期将 done_intermediate 的日志文件转移到 done 目录
  • 通过参数 mapreduce.jobhistory.done-dir 配置,默认值为 ${yarn.app.mapreduce.am.staging-dir}/history/done)
  • 同时删除 “.summary” 文件(该文件中的信息,.jhist 文件中都有)
  • ApplicationMaster 移除 ${yarn.app.mapreduce.am.staging-dir}/yarn/.staging/job_XXXXX_XXX/ 目录

四. 任务运行日志 (Container 日志)

4.1 Container 日志基本概念

默认情况下,任务运行日志 (Container 日志) 产只会存放在各 NodeManager 的本地磁盘上,且 NodeManager 将日志保存到 yarn.nodemanager.log-dirs 下 ,该属性缺省值为 ${yarn.log.dir}/userlogs,也就是 Hadoop 安装目录下的 logs/userlogs 目录中,通常为了分摊磁盘负载,我们会为该参数设置多个路径。

需要注意的是,ApplicationMaster 的自身的日志也存放在该路目下,因为它也运行在 Container 之中,是一个特殊的 task。举例如下,其中,第一个是某个作业的 ApplicationMaster 日志(编号是000001)。且里面包含 stderr 、stdout 、 syslog 三个文件。

1
2
3
4
5
container_1449861199315_0036_01_000001
container_1449861199315_0036_01_000023
container_1449861199315_0036_01_000061
container_1449861199315_0036_01_000099
container_1449861199315_0036_01_000137

因为默认情况下,任务运行日志产只会存放在各 NodeManager 的本地磁盘上,而一个集群又有多个 NodeManager,将作业和任务日志存放在各个节点上肯定不便于统一管理和分析,为此,我们可以启用日志聚集功能。打开该功能后,各个任务运行完成后,会将生成的日志推送到 HDFS 的一个目录下,以便集中管理和分析(之前的并不会立即删除,在 HDFS 上,每个任务产生的三个文件,即 syslog、stderr 和 stdout 将合并一个文件,并通过索引记录各自位置)。

4.2 不开启日志聚合时的日志配置

Container 日志包含 ApplicationMaster 日志和普通 Task 日志等信息。默认情况下,这些日志信息是存放在 ${HADOOP_HOME}/logs/userlogs 目录下(在那些 NodeManager 的机子上),我们可以通过下面的配置进行修改:

1
2
3
4
5
6
7
8
9
10
11
<property>
<description>
Where to store container logs. An application's localized log directory
will be found in ${yarn.nodemanager.log-dirs}/application_${appid}.
Individual containers' log directories will be below this, in
directories named container_{$contid}. Each container directory will
contain the files stderr, stdin, and syslog generated by that container.
</description>
<name>yarn.nodemanager.log-dirs</name>
<value>${yarn.log.dir}/userlogs</value>
</property>

4.3 开启日志聚合时的配置参数

日志聚集是 YARN 提供的日志中央化管理功能,它能将运行完成的 Container/ 任务日志上传到 HDFS 上,从而减轻 NodeManager 负载,且提供一个中央化存储和分析机制。默认情况下,Container/ 任务日志存在在各个 NodeManager 上,如果启用日志聚集功能需要额外的配置。

yarn-site.xml 中设置

1. yarn.log-aggregation-enable

  • 参数解释:是否启用日志聚集功能。
  • 默认值:false

2. yarn.log-aggregation.retain-seconds

  • 参数解释:在 HDFS 上聚集的日志最多保存多长时间。
  • 默认值:-1

3. yarn.log-aggregation.retain-check-interval-seconds

  • 参数解释:多长时间检查一次日志,并将满足条件的删除,如果是 0 或者负数,则为上一个值的 1/10。
  • 默认值:-1

4. yarn.nodemanager.remote-app-log-dir

  • 参数解释:当应用程序运行结束后,日志被转移到的HDFS目录(启用日志聚集功能时有效)
  • 默认值:/tmp/logs

5. yarn.nodemanager.remote-app-log-dir-suffix

  • 参数解释:远程日志目录子目录名称(启用日志聚集功能时有效)
  • 默认值:日志将被转移到目录 ${yarn.nodemanager.remote-app-log-dir}/${user}/${thisParam}

4.4 其他配置指导

  • 远端的聚合日志的地址的文件夹权限应该是 1777, ${NMUser}${NMGroup} 是所有者和所有组.
  • 每个应用层次的日志的权限是 770, 但是文件夹所有人是应用的提交者, 文件夹的所有群组是 ${NMGroup}, 这样安排可以让应用的提交者能访问到聚合后的日志, 并且${NMGroup}可以访问和修改日志.
  • ${NMGroup} 应该是一个有限访问的群组, 这样才不会造成访问泄露.

五. 扩展知识

5.1 mapred-site.xml 和 yarn-site.xml 的作用

1. yarn-site.xml
yarn-site.xml 是 YARN 相关的配置文件,客户端、ResourceManager 和 NodeManager 需要改配置文件,为了简单,可让这三类节点上的该文件是一致的。

2. Mapred-site.xml
Mapred-site.xml 是 MapReduce 特有的配置文件,在 YARN 中,mapreduce 已经变成了一个客户端编程库,因此只有客户端和 jobhistory server 需要该配置文件,其他节点,比如 resourceManager 和 NodeManager 不需要,除非你们也把这些节点作为客户端提供给用户使用,另外,一定要让客户端和 jobhistory server 上的 mapres-site.xml 一致。

5.2 权限相关配置参数

注意,配置这些参数前,应充分理解这几个参数的含义,以防止误配给集群带来的隐患。另外,这些参数均需要在 yarn-site.xml 中配置。

这里的权限由三部分组成,分别是:

1. 管理员和普通用户如何区分
管理员列表由参数 yarn.admin.acl 指定。

2. 服务级别的权限
比如哪些用户可以向集群提交 ResourceManager 提交应用程序,服务级别的权限是通过配置 hadoop-policy.xml 实现的,这个与 Hadoop 1.0 类似。

3. 队列级别的权限
比如哪些用户可以向队列A提交作业等。队列级别的权限是由对应的资源调度器内部配置的,比如 Fair Scheduler 或者 Capacity Scheduler 等。

PMML简介

PMML全称预言模型标记模型(Predictive Model Markup Language),以XML 为载体呈现数据挖掘模型。
PMML 允许您在不同的应用程序之间轻松共享预测分析模型。
因此,您可以在一个系统中定型一个模型,在 PMML 中对其进行表达,然后将其移动到另一个系统中,而不需考虑分析和预测过程中的具体实现细节。
使得模型的部署摆脱了模型开发和产品整合的束缚。

PMML标准

PMML 标准是数据挖掘过程的一个实例化标准,它按照数据挖掘任务执行过程,有序的定义了数据挖掘不同阶段的相关信息:
这里写图片描述

  • 头信息 (Header)
  • 数据字典(DataDictionary)
  • 挖掘模式 (Mining Schema)
  • 数据转换(Transformations)
  • 模型定义 (Model Definition)
  • 评分结果 (Score Result)

头信息(Header)

PMML文件使用头信息作为开始,它主要用于记录产品、版权、模型描述,建模时间等描述性信息。例如:

1
2
3
4
5
<Header copyright="Copyright (c) 2017 liaotuo" description="Random Forest Tree Model">
<Extension name="user" value="liaotuo" extender="Rattle/PMML"/>
<Application name="Rattle/PMML" version="1.4"/>
<Timestamp>2017-07-04 16:33:42</Timestamp>
</Header>

其中:

  • Header 是标识头信息部分的起始标记
  • copyright 包含了所记录模型的版权信息
  • description 包含可读的描述性信息
  • Application 描述了生成本文件所含模型的软件产品。
  • Timestamp 记录了模型创建的时间。

数据字典(DataDictionary)

数据字典定义了所有变量的信息,包括预测变量目标变量。这些信息包括变量名,量度和类型等。
对于分类变量,可能包含各种不同类型的分类值,包括有效值 (valid value),遗漏值 (missing value) 和无效值 (invalid value),
它们由 Value 的“property”属性决定;对于连续变量,可以指定一个或多个有效值范围 (Interval)。

对于字段 Age,范围从0到 120 的值是有效值,不在0-120范围值被定义为无效值。
尽管在此没有显示,您可以使用 PMML 元素 MiningSchema 为无效值和遗漏值定义合适的处理方法。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 <DataDictionary numberOfFields="7"> 
<DataField dataType="double" displayName="Age" name="Age" optype="continuous"/>
<Interval leftMargin="0" rightMargin="120" closure="closedClosed" />
<DataField dataType="string" displayName="Sex" name="Sex" optype="categorical">
<Value displayValue="F" property="valid" value="F"/>
<Value displayValue="M" property="valid" value="M"/>
</DataField>
<DataField dataType="string" displayName="BP" name="BP" optype="categorical">
<Value displayValue="HIGH" property="valid" value="HIGH"/>
<Value displayValue="LOW" property="valid" value="LOW"/>
<Value displayValue="NORMAL" property="valid" value="NORMAL"/>
<Value displayValue="ABNORMAL" property="invalid" value="ABNORMAL"/>
<Value displayValue="MISSING" property="missing" value="MISSING"/>
</DataField>
<DataField dataType="string" displayName="Cholesterol" name="Cholesterol"
optype="categorical">
<Value displayValue="HIGH" property="valid" value="HIGH"/>
<Value displayValue="NORMAL" property="valid" value="NORMAL"/>
</DataField>
<DataField dataType="double" displayName="Na" name="Na" optype="continuous"/>
<DataField dataType="double" displayName="K" name="K" optype="continuous"/>
<DataField dataType="string" displayName="Drug" name="Drug" optype="categorical">
<Value displayValue="drugA" property="valid" value="drugA"/>
<Value displayValue="drugB" property="valid" value="drugB"/>
<Value displayValue="drugC" property="valid" value="drugC"/>
<Value displayValue="drugX" property="valid" value="drugX"/>
<Value displayValue="drugY" property="valid" value="drugY"/>
</DataField>
</DataDictionary>

挖掘模式(Mining Schema)

定义预测变量和目标变量

1
2
3
4
5
6
7
8
9
10
11
<MiningSchema> 
<MiningField importance="0.589759" name="K" usageType="active"/>
<MiningField importance="0.0328595" name="Age" usageType="active"/>
<MiningField importance="0.0249929" name="Na" usageType="active"
outliers=" asExtremeValues" lowValue="0.02" highValue="0.08"/>
<MiningField importance="0.0333406" name="Cholesterol" usageType="active"/>
<MiningField importance="0.307279" name="BP" usageType="active"
missingValueReplacement="HIGH"/>
<MiningField importance="0.0117684" name="Sex" usageType="active"/>
<MiningField name="Drug" usageType="predicted"/>
</MiningSchema>
  • 变量的属性由 “usageType” 值决定,该属性未指定或者值为 “active” 代表预测变量, “predicted”代表目标变量。
    一般来说,一个常见的模型有多个预测变量和一个目标变量, 但是也可以没有预测变量、多个目标变量或者根本没有目标变量。
  • 所有在 Mining Schema 中被引用的变量一定会在数据字典中被定义, 但是不是所有出现在数据字典中的变量会在 Mining Schema 中被应用,
    Mining Schema 定义了数据字典中的一个子集,这个子集才是对模型来说最重要的。

数据转换 (Transformations)

一旦数据字典对数据集做出了定义,那么就可以在其之上进行各种数据转换的预处理操作。这是由于有时用户所提供的数据并不能直接用于建模,
需要将原始的用户数据转换或映射成模型可以识别和使用的数据类型,这就需要使用数据转换来完成。
譬如,神经网络模型内部仅能处理数值型的数据,如果用户数据中含有离散型数据,如性别包含“男”、“女”二值,那在建模前就需要将性别变量映射成 0 和 1 来分别表示“男”和“女”。

PMML 标准支持一些常用的数据转换预处理操作,并在此基础上支持使用函数表达式的转换。以下所列的是标准所定义的一些简单的数据转换操作:

  • 正态化 (Normalization) - 把数据值转化为数值,同时适用于连续性变量和离散变量。
  • 离散化 (Discretization) - 把连续性变量转化为离散变量。
  • 数据映射 (Value mapping) - 把当前离散变量映射成另一种离散性变量。
  • 函数 (Functions) - PMML 内建了大量的常用函数,用户也可以定义自己的函数。
  • 聚合 (Aggregation) - 聚合操作,比如求平均值,最大值,最小值等。

如下:给出了一个使用 Discretization 的示例。通过两个给定的区间将连续型变量“Profit”转换成仅含“negative”和“positive”二值的离散变量。

1
2
3
4
5
6
7
8
9
10
<Discretize field="Profit"> 
<DiscretizeBin binValue="negative">
<Interval closure="openOpen" rightMargin="0"/>
<!-- left margin is -infinity by default -->
</DiscretizeBin>
<DiscretizeBin binValue="positive">
<Interval closure="closedOpen" leftMargin="0"/>
<!-- right margin is +infinity by default -->
</DiscretizeBin>
</Discretize>

如下:给出了一个使用 Functions 的示例,通过使用内建函数 if 和 isMissing 将变量“PREVEXP”中的缺失值替换为指定的均值。
值得注意的是,替换了缺失值之后将产生一个新的变量“PREVEXP_without_missing”。

1
2
3
4
5
6
7
8
9
10
<DerivedField dataType="double" name="PREVEXP_without_missing" 
optype="continuous">
<Apply function="if">
<Apply function="isMissing">
<FieldRef field="PREVEXP"/>
</Apply>
<Constant>mean</Constant>
<FieldRef field="PREVEXP"/>
</Apply>
</DerivedField>

模型定义 (Model Definition)

具体的模型定义,最新的 PMML 4.0.1 定义了一下十三种模型:

  • AssociationModel
  • ClusteringModel
  • GeneralRegressionModel
  • MiningModel
  • NaiveBayesModel
  • NeuralNetwork
  • RegressionModel
  • RuleSetModel
  • SequenceModel
  • SupportVectorMachineModel
  • TextModel
  • TimeSeriesModel
  • TreeModel

这些模型都是帮助使用者从历史性的数据中提取出无法直观发现的,具有推广意义的数据模式。

  • Association model,关联规则模型,常被用来发现大量交易数据中不同产品的购买关系和规则。使用其分析超市的销售单就可以发现,那些购买婴幼儿奶粉和护肤品的客户同时也会以较大的可能性去购买纸尿裤。这样有助于管理人员作出合理的商业决策,有导向的推动购物行为,比如将上述产品放在相邻的购物架上便于客户购买,从而产生更高的销售额。
  • Tree model,树模型,也是很常用的模型,她采用类似树分支的结构将数据逐层划分成节点,而每个叶子节点就表示一个特别的类别。树模型受到应用领域广泛的欢迎,还有一个重要的原因就是她所做出的预测决策易于解释,能够快速推广。为了支持这些模型,PMML 标准提供了大量的语法来有针对性的表示不同的模型。

评分结果(Score Result)

评分结果集可以在输出元素 (Output) 中定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<Output> 
<OutputField name="$R-Drug" displayName="Predicted Value" optype="categorical"
dataType="string" targetField="Drug" feature="predictedValue"/>
<OutputField name="$RC-Drug" displayName="Confidence of Predicted Value"
optype="continuous" dataType="double" targetField="Drug"
feature="standardError"/>
<OutputField name="$RP-drugA" displayName="Probability of drugA"
optype="categorical" dataType="string" targetField="Drug"
feature="probability" value="drugA"/>
<OutputField name="$RP-drugB" displayName="Probability of drugB"
optype="categorical" dataType="string" targetField="Drug"
feature="probability" value="drugB"/>
<OutputField name="$RP-drugC" displayName="Probability of drugC"
optype="categorical" dataType="string" targetField="Drug"
feature="probability" value="drugC"/>
<OutputField name="$RP-drugX" displayName="Probability of drugX"
optype="categorical" dataType="string" targetField="Drug"
feature="probability" value="drugX"/>
<OutputField name="$RP-drugY" displayName="Probability of drugY"
optype="categorical" dataType="string" targetField="Drug"
feature="probability" value="drugY"/>
</Output>

输出元素 : 描述了从模型中获取评分结果值的集合。每一个输出变量指定名称,类型,规则计算和结果特征。
结果特征 (feature): 它是一个结果的标识符 , 它有很多的分类表达,常见统计观念如下:

  • 预测价值(predictedValue):它描述了预测统计的目标值。
  • 概率(probability):它描述预测统计的目标值的概率值。
  • 标准误差(standardError):它描述了标准误差的预测数值。

其他类型参见 http://dmg.org/pmml/v4-3/Output.html

样例pmml

LR.pmml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<?xml version="1.0"?>
<PMML version="4.3" xmlns="http://www.dmg.org/PMML-4_3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.dmg.org/PMML-4_3 http://www.dmg.org/pmml/v4-3/pmml-4-3.xsd">
<Header copyright="Copyright (c) 2017 liaotuo" description="Generalized Linear Regression Model">
<Extension name="user" value="liaotuo" extender="Rattle/PMML"/>
<Application name="Rattle/PMML" version="1.4"/>
<Timestamp>2017-07-11 13:18:36</Timestamp>
</Header>
<DataDictionary numberOfFields="4">
<DataField name="am" optype="continuous" dataType="double"/>
<DataField name="cyl" optype="continuous" dataType="double"/>
<DataField name="hp" optype="continuous" dataType="double"/>
<DataField name="wt" optype="continuous" dataType="double"/>
</DataDictionary>
<GeneralRegressionModel modelName="General_Regression_Model" modelType="generalizedLinear" functionName="regression" algorithmName="glm" distribution="binomial" linkFunction="logit">
<MiningSchema>
<MiningField name="am" usageType="predicted"/>
<MiningField name="cyl" usageType="active"/>
<MiningField name="hp" usageType="active"/>
<MiningField name="wt" usageType="active"/>
</MiningSchema>
<Output>
<OutputField name="Predicted_am" feature="predictedValue"/>
</Output>
<ParameterList>
<Parameter name="p0" label="(Intercept)"/>
<Parameter name="p1" label="cyl"/>
<Parameter name="p2" label="hp"/>
<Parameter name="p3" label="wt"/>
</ParameterList>
<FactorList/>
<CovariateList>
<Predictor name="cyl"/>
<Predictor name="hp"/>
<Predictor name="wt"/>
</CovariateList>
<PPMatrix>
<PPCell value="1" predictorName="cyl" parameterName="p1"/>
<PPCell value="1" predictorName="hp" parameterName="p2"/>
<PPCell value="1" predictorName="wt" parameterName="p3"/>
</PPMatrix>
<ParamMatrix>
<PCell parameterName="p0" df="1" beta="19.7028827927103"/>
<PCell parameterName="p1" df="1" beta="0.487597975045672"/>
<PCell parameterName="p2" df="1" beta="0.0325916758086386"/>
<PCell parameterName="p3" df="1" beta="-9.14947126999654"/>
</ParamMatrix>
</GeneralRegressionModel>
</PMML>

rule.pmml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<RuleSetModel modelName="RiskEval" functionName="classification" algorithmName="RuleSet">
<MiningSchema>
<MiningField name="var973" usageType="active" invalidValueTreatment="asMissing" missingValueReplacement="0"/>
<MiningField name="var969" usageType="active" invalidValueTreatment="asMissing" missingValueReplacement="0"/>
<MiningField name="var20" usageType="active" invalidValueTreatment="asMissing" missingValueReplacement="0"/>
<MiningField name="var393" usageType="active" invalidValueTreatment="asMissing" missingValueReplacement="0"/>
<MiningField name="var868" usageType="active" invalidValueTreatment="asMissing" missingValueReplacement="0"/>
<MiningField name="var543" usageType="active" invalidValueTreatment="asMissing" missingValueReplacement="0"/>
<MiningField name="var1213" usageType="active" invalidValueTreatment="asMissing" missingValueReplacement="0.0"/>
<MiningField name="flg" usageType="target"/>
</MiningSchema>
<RuleSet defaultScore="0">
<RuleSelectionMethod criterion="firstHit"/>
<SimpleRule id="RULE1" score="1" confidence="1">
<CompoundPredicate booleanOperator="or">
<SimplePredicate field="var973" operator="greaterOrEqual" value="1"/>
<SimplePredicate field="var969" operator="greaterOrEqual" value="1"/>
<SimplePredicate field="var20" operator="greaterOrEqual" value="7"/>
<CompoundPredicate booleanOperator="and">
<SimplePredicate field="var393" operator="greaterOrEqual" value="2"/>
<SimplePredicate field="var543" operator="notEqual" value="1"/>
</CompoundPredicate>
<CompoundPredicate booleanOperator="and">
<SimplePredicate field="var868" operator="greaterOrEqual" value="1"/>
<SimplePredicate field="var1213" operator="lessThan" value="0.8"/>
</CompoundPredicate>
</CompoundPredicate>
</SimpleRule>
</RuleSet>
</RuleSetModel>

参考资料

https://blog.csdn.net/c1481118216/article/details/78411200