Hudi搭建

Maven


解压文件

1
tar -zxvf /opt/software/apache-maven-3.6.1-bin.tar.gz -C /opt/module/

配置环境变量

1
2
3
4
vi /etc/profile
#maven
export MAVEN_HOME=/opt/module/apache-maven-3.6.1-bin
export PATH=$PATH:$MAVEN_HOME/bin

添加阿里镜像

1
2
3
4
5
6
7
vi /opt/module/maven-3.6.1/conf/settings.xml
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>

修改本地仓库

1
2
vi /opt/module/maven-3.6.1/conf/settings.xml
<localRepository>/opt/software/RepMaven</localRepository>

Hudi


解压文件

1
tar -zxvf /opt/software/hudi-0.11.0.src.tgz -C /opt/module/

修改POM文件

修改依赖组件版本
1
2
3
vim /opt/software/hudi-0.12.0/pom.xml
<hadoop.version>3.1.3</hadoop.version>
<hive.version>3.1.2</hive.version>
修改源码兼容hadoop3.x
1
2
vim /opt/software/hudi-0.12.0/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
try(FSDataOutputStream outputStream) = new FSDataOutputStream(baos)

在括号中加入null,修改后如下

1
try(FSDataOutputStream outputStream) = new FSDataOutputStream(baos,null)

位置在文件110行左右,可以直接搜索baos进行操作

hudi-spark-bundle

修改了Hive版本为3.1.2,其携带的jetty是0.9.3,hudi本身用的0.9.4,存在依赖冲突。修改hudi-spark-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
vim /opt/software/hudi-0.12.0/packaging/hudi-spark-bundle/pom.xml
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
//hive-service添加内容
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
</dependency>

<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
//hive-jdbc添加内容
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
//hive-metastore添加内容
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
<scope>${spark.bundle.hive.scope}</scope>
//hive-common添加内容
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- 增加hudi配置版本的jetty -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${jetty.version}</version>
</dependency>
hudi-utilities-bundle

修改hudi-utilities-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
vim /opt/software/hudi-0.12.0/packaging/hudi-utilities-bundle/pom.xml
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
//hudi-common添加内容
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
//hudi-client-common添加内容
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>


<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
//hive-service添加内容
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
</dependency>

<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
//hive-jdbc添加内容
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>
//hive-metastore添加内容
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
<scope>${utilities.bundle.hive.scope}</scope>、
//hive-common添加内容
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
<artifactId>javax.servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- 增加hudi配置版本的jetty -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${jetty.version}</version>
</dependency>

执行编译命令

  • 打包命令
1
2
mvn clean package
-DskipTests -Dspark3.1 -Dflink1.14 -Dscala-2.12 -Dhadoop.version=3.1.3 -Pflink-bundle-shade-hive3

-DskipTests,不执行测试用例,但编译测试用例类生成相应的class文件至target/test-classes下。

-Dmaven.test.skip=true,不执行测试用例,也不编译测试用例类。

-Dspark -Dflink -Dscala 为对应组件的版本

-Pflink-bundle-shade-hive3 如果不指定此条,会在hudi编译时对flink-bundle的架包报错

需要注意的是,hudi的编译命令需要在hudi的根目录下进行执行


运行案例

移动jar包

我们编译好的jar包存储在hudi根目录下的packaging中,我们需要进入packaging文件夹找到hudi-spark3.1-bundle_2.12-0.11.0.jar架包即可。

1
cd /opt/module/hudi-0.11.0/packaging/hudi-spark-bundle/targe

随后移动架包到spark目录下的jars文件夹中

1
cp /opt/module/hudi-0.11.0/packaging/hudi-spark-bundle/target/hudi-spark3.1-bundle_2.12-0.11.0 /opt/module/spark-3.1.1-bin-hadoop3.2/jars/
进入spark-shell
1
2
3
spark-shell \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \

–conf ‘spark.serializer=org.apache.spark.serializer.KryoSerializer’ 序列化

–conf ‘spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension’ 集成spark

运行案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()

结果

案例结果图
案例结果图