1. 为什么使用Kudu作为存储介质
- 数据库数据上的快速分析
目前很多业务使用事务型数据库(MySQL、Oracle)做数据分析,把数据写入数据库,然后使用 SQL 进行有效信息提取,当数据规模很小的时候,这种方式确实是立竿见影的,但是当数据量级起来以后,会发现数据库吃不消了或者成本开销太大了,此时就需要把数据从事务型数据库里拷贝出来或者说剥离出来,装入一个分析型的数据库里。发现对于实时性和变更性的需求,目前只有 Kudu 一种组件能够满足需求,所以就产生了这样的一种场景:
MySQL 数据库增、删、改的数据通过 Binlog 实时的被同步到 Kudu 里,同时在 Impala(或者其他计算引擎如 Spark、Hive、Presto、MapReduce)上可以实时的看到。
这种场景也是目前业界使用最广泛的,认可度最高。
- 用户行为日志的快速分析
对于用户行为日志的实时性敏感的业务,比如电商流量、AB 测试、优惠券的点击反馈、广告投放效果以及秒级导入秒级查询等需求,按 Kudu 出现以前的架构基本上都是这张图的模式:
不仅链路长而且实时性得不到有力保障,有些甚至是 T + 1 的,极大的削弱了业务的丰富度。
引入 Kudu 以后,大家看,数据的导入和查询都是在线实时的:
这种场景目前也是网易考拉和hub在使用的,其中hub甚至把 Kudu 当 HBase 来作点查使用。
2. Kudu入门
2.1 Kudu介绍
2.1.1 背景介绍
在Kudu之前,大数据主要以两种方式存储;
- 静态数据:
- 以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。
- 这类存储的局限性是数据无法进行随机的读写。
- 动态数据:
- 以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。
- 这类存储的局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。
从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求。
如上图所示,数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时(通常是 T+1 或者 T+H)将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:HDFS)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景,但它有如下缺点:
- 架构复杂。从架构上看,数据在HBase、消息队列、HDFS 间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。
- 时效性低。数据从HBase导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。
- 难以应对后续的更新。真实场景中,总会有数据是延迟到达的。如果这些数据之前已经从HBase导出到HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。
为了解决上述架构的这些问题,Kudu应运而生。Kudu的定位是Fast Analytics on Fast Data,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎。
从上图可以看出,KUDU 是一个折中的产品,在 HDFS 和 HBase 这两个偏科生中平衡了随机读写和批量分析的性能。从 KUDU 的诞生可以说明一个观点:底层的技术发展很多时候都是上层的业务推动的,脱离业务的技术很可能是空中楼阁。
2.1.2 新的硬件设备
内存(RAM)的技术发展非常快,它变得越来越便宜,容量也越来越大。Cloudera的客户数据显示,他们的客户所部署的服务器,2012年每个节点仅有32GB RAM,现如今增长到每个节点有128GB或256GB RAM。存储设备上更新也非常快,在很多普通服务器中部署SSD也是屡见不鲜。HBase、HDFS、以及其他的Hadoop工具都在不断自我完善,从而适应硬件上的升级换代。然而,从根本上,HDFS基于03年GFS,HBase基于05年BigTable,在当时系统瓶颈主要取决于底层磁盘速度。当磁盘速度较慢时,CPU利用率不足的根本原因是磁盘速度导致的瓶颈,当磁盘速度提高了之后,CPU利用率提高,这时候CPU往往成为系统的瓶颈。HBase、HDFS由于年代久远,已经很难从基本架构上进行修改,而Kudu是基于全新的设计,因此可以更充分地利用RAM、I/O资源,并优化CPU利用率。
我们可以理解为:Kudu相比与以往的系统,CPU使用降低了,I/O的使用提高了,RAM的利用更充分了。
2.1.3 Kudu是什么
Apache Kudu是由Cloudera开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。它是一个融合HDFS和HBase的功能的新组件,具备介于两者之间的新存储组件。
Kudu支持水平扩展,并且与Cloudera Impala和Apache Spark等当前流行的大数据查询和分析工具结合紧密。
2.1.4 Kudu的应用场景
Kudu的很多特性跟HBase很像,它支持索引键的查询和修改。Cloudera曾经想过基于Hbase进行修改,然而结论是对HBase的改动非常大,Kudu的数据模型和磁盘存储都与Hbase不同。HBase本身成功的适用于大量的其它场景,因此修改HBase很可能吃力不讨好。最后Cloudera决定开发一个全新的存储系统。
- Strong performance for both scan and random access to help customers simplify complex hybrid architectures(适用于那些既有随机访问,也有批量数据扫描的复合场景)
- High CPU efficiency in order to maximize the return on investment that our customers are making in modern processors(高计算量的场景)
- High IO efficiency in order to leverage modern persistent storage(使用了高性能的存储设备,包括使用更多的内存)
- The ability to upDATe data in place, to avoid extraneous processing and data movement(支持数据更新,避免数据反复迁移)
- The ability to support active-active replicated clusters that span multiple data centers in geographically distant locations(支持跨地域的实时数据备份和查询)
2.1.5 Kudu架构
下图显示了一个具有三个 master 和多个 tablet server 的 Kudu 集群,每个服务器都支持多个 tablet。
它说明了如何使用 Raft 共识来允许 master 和 tablet server 的 leader 和 follow。
此外,tablet server 可以成为某些 tablet 的 leader,也可以是其他 tablet 的 follower。leader 以金色显示,而 follower 则显示为蓝色。
下面是一些基本概念:
角色 | 作用 |
---|---|
Master | 集群中的老大,负责集群管理、元数据管理等功能 |
Tablet Server | 集群中的小弟,负责数据存储,并提供数据读写服务 一个 tablet server 存储了table表的tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。 只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。 |
Table(表) | 一张table是数据存储在Kudu的tablet server中。表具有 schema 和全局有序的primary key(主键)。table 被分成称为 tablets 的 segments。 |
Tablet | 一个 tablet 是一张 table连续的segment,tablet是kudu表的水平分区,类似于google Bigtable的tablet,或者HBase的region。每个tablet存储着一定连续range的数据(key),且tablet两两间的range不会重叠。一张表的所有tablet包含了这张表的所有key空间。与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server之间达成一致性。 |
2.2 Java代码操作Kudu
2.2.1 构建maven工程
2.2.2 导入依赖
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
2.2.3 创建包结构
包名 | 说明 |
---|---|
com.erainm | 代码所在的包目录 |
2.2.4 初始化方法
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.junit.Before;
public class TestKudu {
//定义KuduClient客户端对象
private static KuduClient kuduClient;
//定义表名
private static String tableName = "person";
@Before
public void init() {
//指定master地址
String masterAddress = "node2";
//创建kudu的数据库连接
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
}
//构建表schema的字段信息
//字段名称 数据类型 是否为主键
public ColumnSchema newColumn(String name, Type type, boolean isKey) {
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
column.key(isKey);
return column.build();
}
}
2.2.5 创建表
@Test
public void createTable() throws KuduException {
//设置表的schema
List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
columns.add(newColumn("CompanyId", Type.INT32, true));
columns.add(newColumn("WorkId", Type.INT32, false));
columns.add(newColumn("Name", Type.STRING, false));
columns.add(newColumn("Gender", Type.STRING, false));
columns.add(newColumn("Photo", Type.STRING, false));
Schema schema = new Schema(columns);
//创建表时提供的所有选项
CreateTableOptions tableOptions = new CreateTableOptions();
//设置表的副本和分区规则
LinkedList<String> list = new LinkedList<String>();
list.add("CompanyId");
//设置表副本数
tableOptions.setNumReplicas(1);
//设置range分区
//tableOptions.setRangePartitionColumns(list);
//设置hash分区和分区的数量
tableOptions.addHashPartitions(list, 3);
try {
kuduClient.createTable("person", schema, tableOptions);
} catch (Exception e) {
e.printStackTrace();
}
kuduClient.close();
}
2.2.6 插入数据
@Test
public void loadData() throws KuduException {
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//创建KuduSession对象 kudu必须通过KuduSession写入数据
KuduSession kuduSession = kuduClient.newSession();
//采用flush方式 手动刷新
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
kuduSession.setMutationBufferSpace(3000);
//准备数据
for(int i=1; i<=10; i++){
Insert insert = kuduTable.newInsert();
//设置字段的内容
insert.getRow().addInt("CompanyId",i);
insert.getRow().addInt("WorkId",i);
insert.getRow().addString("Name","lisi"+i);
insert.getRow().addString("Gender","male");
insert.getRow().addString("Photo","person"+i);
kuduSession.flush();
kuduSession.apply(insert);
}
kuduSession.close();
kuduClient.close();
}
2.2.7 查询数据
@Test
public void queryData() throws KuduException {
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//获取scanner扫描器
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
KuduScanner scanner = scannerBuilder.build();
//遍历
while(scanner.hasMoreRows()){
RowResultIterator rowResults = scanner.nextRows();
while (rowResults.hasNext()){
RowResult result = rowResults.next();
int companyId = result.getInt("CompanyId");
int workId = result.getInt("WorkId");
String name = result.getString("Name");
String gender = result.getString("Gender");
String photo = result.getString("Photo");
System.out.print("companyId:"+companyId+" ");
System.out.print("workId:"+workId+" ");
System.out.print("name:"+name+" ");
System.out.print("gender:"+gender+" ");
System.out.println("photo:"+photo);
}
}
//关闭
scanner.close();
kuduClient.close();
}
2.2.8 修改数据
@Test
public void upDATEData() throws KuduException {
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//构建kuduSession对象
KuduSession kuduSession = kuduClient.newSession();
//设置刷新数据模式,自动提交
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
//更新数据需要获取UpDATE对象
UpDATE upDATE = kuduTable.newUpDATE();
//获取row对象
PartialRow row = upDATE.getRow();
//设置要更新的数据信息
row.addInt("CompanyId",1);
row.addString("Name","kobe");
//操作这个upDATE对象
kuduSession.apply(upDATE);
kuduSession.close();
}
2.2.9 删除数据
@Test
public void deleteData() throws KuduException {
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
KuduSession kuduSession = kuduClient.newSession();
//获取Delete对象
Delete delete = kuduTable.newDelete();
//构建要删除的行对象
PartialRow row = delete.getRow();
//设置删除数据的条件
row.addInt("CompanyId",2);
kuduSession.flush();
kuduSession.apply(delete);
kuduSession.close();
kuduClient.close();
}
2.2.10 删除表
@Test
public void dropTable() throws KuduException {
//删除表
DeleteTableResponse response = kuduClient.deleteTable(tableName);
//关闭客户端连接
kuduClient.close();
}
2.2.11 kudu的分区方式(结合Impala)
为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。行总是属于单个tablet 。将行分配给 tablet 的方法由在表创建期间设置的表的分区决定。
kudu提供了3种分区方式。
2.2.11.1 Hash Partitioning (哈希分区)
哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一; 哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet 之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。
@Test
public void testHashPartition() throws KuduException {
//设置表的schema
LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
columnSchemas.add(newColumn("WorkId", Type.INT32,false));
columnSchemas.add(newColumn("Name", Type.STRING,false));
columnSchemas.add(newColumn("Gender", Type.STRING,false));
columnSchemas.add(newColumn("Photo", Type.STRING,false));
//创建schema
Schema schema = new Schema(columnSchemas);
//创建表时提供的所有选项
CreateTableOptions tableOptions = new CreateTableOptions();
//设置副本数
tableOptions.setNumReplicas(1);
//设置范围分区的规则
LinkedList<String> parcols = new LinkedList<String>();
parcols.add("CompanyId");
//设置按照那个字段进行range分区
tableOptions.addHashPartitions(parcols,6);
try {
kuduClient.createTable("dog",schema,tableOptions);
} catch (KuduException e) {
e.printStackTrace();
}
kuduClient.close();
}
2.2.11.2 Range Partitioning (范围分区)
范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象.
@Test
public void testRangePartition() throws KuduException {
//设置表的schema
LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
columnSchemas.add(newColumn("WorkId", Type.INT32,false));
columnSchemas.add(newColumn("Name", Type.STRING,false));
columnSchemas.add(newColumn("Gender", Type.STRING,false));
columnSchemas.add(newColumn("Photo", Type.STRING,false));
//创建schema
Schema schema = new Schema(columnSchemas);
//创建表时提供的所有选项
CreateTableOptions tableOptions = new CreateTableOptions();
//设置副本数
tableOptions.setNumReplicas(1);
//设置范围分区的规则
LinkedList<String> parcols = new LinkedList<String>();
parcols.add("CompanyId");
//设置按照那个字段进行range分区
tableOptions.setRangePartitionColumns(parcols);
int count=0;
for(int i =0;i<10;i++){
//范围开始
PartialRow lower = schema.newPartialRow();
lower.addInt("CompanyId",count);
//范围结束
PartialRow upper = schema.newPartialRow();
count +=10;
upper.addInt("CompanyId",count);
//设置每一个分区的范围
tableOptions.addRangePartition(lower,upper);
}
try {
kuduClient.createTable("student",schema,tableOptions);
} catch (KuduException e) {
e.printStackTrace();
}
kuduClient.close();
}
2.2.11.3 Multilevel Partitioning (多级分区)
Kudu 允许一个表在单个表上组合多级分区。
当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点 需求.
@Test
public void testMultilevelPartition() throws KuduException {
//设置表的schema
LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
columnSchemas.add(newColumn("CompanyId", Type.INT32,true));
columnSchemas.add(newColumn("WorkId", Type.INT32,false));
columnSchemas.add(newColumn("Name", Type.STRING,false));
columnSchemas.add(newColumn("Gender", Type.STRING,false));
columnSchemas.add(newColumn("Photo", Type.STRING,false));
//创建schema
Schema schema = new Schema(columnSchemas);
//创建表时提供的所有选项
CreateTableOptions tableOptions = new CreateTableOptions();
//设置副本数
tableOptions.setNumReplicas(1);
//设置范围分区的规则
LinkedList<String> parcols = new LinkedList<String>();
parcols.add("CompanyId");
//hash分区
tableOptions.addHashPartitions(parcols,5);
//range分区
int count=0;
for(int i=0;i<10;i++){
PartialRow lower = schema.newPartialRow();
lower.addInt("CompanyId",count);
count+=10;
PartialRow upper = schema.newPartialRow();
upper.addInt("CompanyId",count);
tableOptions.addRangePartition(lower,upper);
}
try {
kuduClient.createTable("cat",schema,tableOptions);
} catch (KuduException e) {
e.printStackTrace();
}
kuduClient.close();
}
2.2.12 修改表
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class AlterTable {
//定义kudu的客户端对象
private static KuduClient kuduClient;
//定义一张表名称
private static String tableName = "person";
@Before
public void init() {
//指定kudu的master地址
String masterAddress = "node2";
//创建kudu的数据库连接
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
}
@Test
public void alterTableAddColumn() {
AlterTableOptions alterTableOptions = new AlterTableOptions();
alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
try {
kuduClient.alterTable(tableName, alterTableOptions);
} catch (KuduException e) {
e.printStackTrace();
}
}
@Test
public void alterTableDeleteColumn(){
AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
try {
kuduClient.alterTable(tableName, alterTableOptions);
} catch (KuduException e) {
e.printStackTrace();
}
}
@Test
public void alterTableAddRangePartition(){
int lowerValue = 110;
int upperValue = 120;
try {
KuduTable kuduTable = kuduClient.openTable(tableName);
List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
boolean flag = true;
for (Partition rangePartition : rangePartitions) {
int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
if(startKey == lowerValue){
flag = false;
}
}
if(flag) {
PartialRow lower = kuduTable.getSchema().newPartialRow();
lower.addInt("Id", lowerValue);
PartialRow upper = kuduTable.getSchema().newPartialRow();
upper.addInt("Id", upperValue);
kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
}else{
System.out.println("分区已经存在,不能重复创建!");
}
} catch (KuduException e) {
e.printStackTrace();
} catch (Exception exception) {
exception.printStackTrace();
}
}
@Test
public void dropTable() throws KuduException {
kuduClient.deleteTable(tableName);
}
}
2.3 Spark操作Kudu
- Spark与KUDU集成支持:
- DDL操作(创建/删除)
- 本地Kudu RDD
- Native Kudu数据源,用于DataFrame集成
- 从kudu读取数据
- 从Kudu执行插入/更新/ upsert /删除
- 谓词下推
- Kudu和Spark SQL之间的模式映射
- 到目前为止,我们已经听说过几个上下文,例如SparkContext,SQLContext,HiveContext, SparkSession,现在,我们将使用Kudu引入一个KuduContext。这是可以在Spark应用程序中广播的主要可序列化对象。此类代表在Spark执行程序中与Kudu Java客户端进行交互。
- KuduContext提供执行DDL操作所需的方法,与本机Kudu RDD的接口,对数据执行更新/插入/删除,将数据类型从Kudu转换为Spark等。
2.3.1 创建表
- 定义kudu的表需要分成5个步骤:
- 提供表名
- 提供schema
- 提供主键
- 定义重要选项;例如:定义分区的schema
- 调用create Table api
- 代码开发
import java.util
import com.erainm.SparkKuduDemo.TABLE_NAME
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{ SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType}
object SparkKuduTest {
def main(args: Array[String]): Unit = {
//构建sparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")
//构建SparkSession对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//获取sparkContext对象
val sc: SparkContext = sparkSession.sparkContext
sc.setLogLevel("warn")
//构建KuduContext对象
val kuduContext = new KuduContext("node2:7051", sc)
//1.创建表操作
createTable(kuduContext)
def createTable(kuduContext: KuduContext) = {
//如果表不存在就去创建
if (!kuduContext.tableExists(TABLE_NAME)) {
//构建创建表的表结构信息,就是定义表的字段和类型
val schema: StructType = StructType(
StructField("userId", StringType, false) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) ::
StructField("sex", StringType, false) :: Nil)
//指定表的主键字段
val keys = List("userId")
//指定创建表所需要的相关属性
val options: CreateTableOptions = new CreateTableOptions
//定义分区的字段
val partitionList = new util.ArrayList[String]
partitionList.add("userId")
//添加分区方式为hash分区
options.addHashPartitions(partitionList, 6)
//创建表
kuduContext.createTable(TABLE_NAME, schema, keys, options)
}
}
}
}
定义表时要注意的是Kudu表选项值。你会注意到在指定组成范围分区列的列名列表时我们调用“asJava”方 法。这是因为在这里,我们调用了Kudu Java客户端本身,它需要Java对象(即java.util.List)而不是Scala的List对 象;(要使“asJava”方法可用,请记住导入JavaConverters库。) 创建表后,通过将浏览器指向http//master主机名:8051/tables
- 来查看Kudu主UI可以找到创建的表,通过单击表ID,能够看到表模式和分区信息。
点击Table id 可以观察到表的schema等信息:
2.3.2 DML操作
Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成. 包括:
- INSERT - 将DataFrame的行插入Kudu表。请注意,虽然API完全支持INSERT,但不鼓励在Spark中使用它。 使用INSERT是有风险的,因为Spark任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT将不允许插入行(导致失败)。相反,我们鼓励使用下面描述 的INSERT_IGNORE。
- INSERT-IGNORE - 将DataFrame的行插入Kudu表。如果表存在,则忽略插入动作。
- DELETE - 从Kudu表中删除DataFrame中的行
- UPSERT - 如果存在,则在Kudu表中更新DataFrame中的行,否则执行插入操作。
- UPDATE - 更新dataframe中的行
2.3.2.1 插入数据insert操作
先创建一张表,然后把数据插入到表中
import java.util
import com.erainm.SparkKuduDemo.{ TABLE_NAME, erainm}
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{ SparkConf, SparkContext}
import org.apache.spark.sql.{ DataFrame, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType}
object SparkKuduTest {
//定义样例类
case class erainm(id:Int, name:String, age:Int, sex:Int)
def main(args: Array[String]): Unit = {
//构建sparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("SparkKuduTest").setMaster("local[2]")
//构建SparkSession对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//获取sparkContext对象
val sc: SparkContext = sparkSession.sparkContext
sc.setLogLevel("warn")
//构建KuduContext对象
val kuduContext = new KuduContext("node2:7051", sc)
//1.创建表操作
createTable(kuduContext)
def createTable(kuduContext: KuduContext) = {
//如果表不存在就去创建
if (!kuduContext.tableExists(TABLE_NAME)) {
//构建创建表的表结构信息,就是定义表的字段和类型
val schema: StructType = StructType(
StructField("userId", StringType, false) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) ::
StructField("sex", StringType, false) :: Nil)
//指定表的主键字段
val keys = List("userId")
//指定创建表所需要的相关属性
val options: CreateTableOptions = new CreateTableOptions
//定义分区的字段
val partitionList = new util.ArrayList[String]
partitionList.add("userId")
//添加分区方式为hash分区
options.addHashPartitions(partitionList, 6)
//创建表
kuduContext.createTable(TABLE_NAME, schema, keys, options)
}
}
def inserData(session: SparkSession, sc: SparkContext, kuduContext: KuduContext): Unit = {
//定义数据
val data = List(erainm(1, "tom", 30, 1), erainm(2, "mark", 26, 0))
val erainmRDD = sc.makeRDD(data)
import session.implicits._
val dataFrame: DataFrame = erainmRDD.toDF
kuduContext.insertRows(dataFrame, TABLE_NAME)
}
}
}
2.3.2.2 删除数据delete操作
def deleteData(session: SparkSession, kuduContext: KuduContext): Unit = {
//定义数据
val data = List(erainm(1, "tom", 50, 1), erainm(2, "mark", 30, 0))
import session.implicits._
val dataFrame: DataFrame = data.toDF().select("id")
kuduContext.deleteRows(dataFrame, TABLE_NAME)
}
2.3.2.3 更新数据upsert操作
def upDATEData(session: SparkSession, kuduContext: KuduContext): Unit = {
//定义数据
val data = List(erainm(1, "tom", 50, 1), erainm(2, "mark", 30, 0))
import session.implicits._
val dataFrame: DataFrame = data.toDF()
kuduContext.upDATERows(dataFrame, TABLE_NAME)
}
2.3.3 dataFrame操作kudu
2.3.3.1 DataFrameApi读取kudu表中的数据
虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本身调用读/写API。要设置读取,我们需要为Kudu表指定选项,命名我们要读取的表以及为表提供服务的Kudu集群的Kudu主服务器列表。
- 代码示例
def getTableData(sparkSession: SparkSession, kuduMaster: String, tableName: String): Unit = {
//定义map集合,封装kudu的master地址和要读取的表名
val options = Map(
"kudu.master" -> kuduMaster,
"kudu.table" -> tableName
)
sparkSession.read.options(options).kudu.show()
}
2.3.3.2 DataFrameApi写数据到kudu表中
在通过DataFrame API编写时,目前只支持一种模式“append”。尚未实现的“覆盖”模式。
- 代码示例
def dataFrame2Kudu(session: SparkSession, kuduContext: KuduContext): Unit ={
val data = List(erainm(3, "canglaoshi", 14, 0), erainm(4, "xiaowang", 18, 1))
import session.implicits._
val dataFrame = data.toDF
//目前,在kudu中,数据的写入只支持append追加
dataFrame.write.mode("append").options(kuduOptions).kudu
//查看结果
//导包
import org.apache.kudu.spark.kudu._
//加载表的数据,导包调用kudu方法,转换为dataFrame,最后在使用show方法显示结果
sparkSession.read.options(kuduOptions).kudu.show()
}
2.3.3.3 使用sparksql操作kudu表
可以选择使用Spark SQL直接使用INSERT语句写入Kudu表;与’append’类似,INSERT语句实际上将默认使用 UPSERT语义处理;
- 代码示例
def SparkSql2Kudu(sparkSession: SparkSession, sc: SparkContext, kuduMaster: String, tableName: String): Unit = {
//定义map集合,封装kudu的master地址和表名
val options = Map(
"kudu.master" -> kuduMaster,
"kudu.table" -> tableName
)
val data = List(erainm(10, "小张", 30, 0), erainm(11, "小王", 40, 0))
import sparkSession.implicits._
val dataFrame: DataFrame = sc.parallelize(data).toDF
//把dataFrame注册成一张表
dataFrame.createTempView("temp1")
//获取kudu表中的数据,然后注册成一张表
sparkSession.read.options(options).kudu.createTempView("temp2")
//使用sparkSQL的insert操作插入数据
sparkSession.sql("insert into table temp2 select * from temp1")
sparkSession.sql("select * from temp2 where age >30").show()
}
2.3.4 Kudu Native RDD
Spark与Kudu的集成同时提供了kudu RDD.
- 代码示例
val columnsList = List("id", "name", "age", "sex")
val rowRDD: RDD[Row] = kuduContext.kuduRDD(sc, TABLE_NAME, columnsList)
rowRDD.foreach(println(_))
sc.stop()
//session.read.options(kuduOptions).kudu.show()
2.3.5 修改表
def addColumn(kuduContext: KuduContext): Unit ={
val alterTableOptions: AlterTableOptions = new AlterTableOptions
alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build)
try {
kuduContext.syncClient.alterTable(tableName, alterTableOptions)
} catch {
case ex:Exception => ex.printStackTrace()
}
}