0%

使用Java操作HBase数据库

一、添加依赖

首先我们在maven项目中添加下面两个依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- hbase依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.4.10</version>
</dependency>
<!-- 单元测试依赖 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.3.2</version>
<scope>test</scope>
</dependency>



二、连接HBase数据库

1. 编写连接数据库的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* @title HBaseConnection
* @date 2019/12/8 20:46
* @description 连接HBase数据库
*/
public class HBaseConnection {
private static final HBaseConnection INSTANCE = new HBaseConnection();
private static Configuration configuration;
private static Connection connection;

private HBaseConnection(){
try {
if(configuration == null){
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "localhost:2181");
}
}catch (Exception e){
e.printStackTrace();
}
}

private Connection getConnection(){
if (connection == null || connection.isClosed()){
try{
connection = ConnectionFactory.createConnection(configuration);
}catch (Exception e){
e.printStackTrace();
}
}
return connection;
}

public static Connection getHBaseConnection(){
return INSTANCE.getConnection();
}

public static Table getTable(String tableName) throws IOException {
return INSTANCE.getConnection().getTable(TableName.valueOf(tableName));
}

public static void closeConnection(){
if(connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

2. 编写数据库连接测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @title HBaseTest
* @date 2019/12/8 21:35
* @description //todo
*/
public class HBaseConnectionTest {

@Test
public void getConnectionTest(){
Connection hBaseConnection = HBaseConnection.getHBaseConnection();
System.out.println(hBaseConnection.isClosed());
HBaseConnection.closeConnection();
System.out.println(hBaseConnection.isClosed());
}

@Test
public void getTableTest(){
try{
Table table = HBaseConnection.getTable("US_POPULATION");
System.out.println(table.getName().getNameAsString());
table.close();
}catch (Exception e){
e.printStackTrace();
}
}
}


三、使用Java实现HBase常见操作

1. 编写操作数据库的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/**
* @title HBaseUtil
* @date 2019/12/9 11:14
* @description 操作HBase工具类
*/
public class HBaseUtil {

/**
* 创建HBase表
* @param tableName 表名
* @param cfs 列族的数据
* @return 是否创建成功
*/
public static boolean createTable(String tableName, String[] cfs){
try (HBaseAdmin admin = (HBaseAdmin)HBaseConnection.getHBaseConnection().getAdmin()){
if(admin.tableExists(tableName)){
return false;
}
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
Arrays.stream(cfs).forEach(cf -> {
HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf);
columnDescriptor.setMaxVersions(1);
tableDescriptor.addFamily(columnDescriptor);
});
admin.createTable(tableDescriptor);
}catch (Exception e){
e.printStackTrace();
}
return true;
}

/**
* 删除HBase表
* @param tableName 表名
* @return
*/
public static boolean deleteTable(String tableName){
try (HBaseAdmin admin = (HBaseAdmin)HBaseConnection.getHBaseConnection().getAdmin()){
admin.disableTable(tableName);
admin.deleteTable(tableName);
}catch (Exception e){
e.printStackTrace();
}
return true;
}

/**
* HBase表中插入一条数据
* @param tableName 表名
* @param roeKey 唯一标识
* @param cfName 列族名
* @param qualifier 列标识
* @param data 数据
* @return 是否插入成功
*/
public static boolean putRow(String tableName, String roeKey, String cfName, String qualifier, String data){
try (Table table = HBaseConnection.getTable(tableName)){
Put put = new Put(Bytes.toBytes(roeKey));
put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier), Bytes.toBytes(data));
table.put(put);
}catch (Exception e){
e.printStackTrace();
}
return true;
}

/**
* HBase表中批量插入数据
* @param tableName
* @param puts
* @return
*/
public static boolean putRows(String tableName, List<Put> puts){
try (Table table = HBaseConnection.getTable(tableName)){
table.put(puts);
}catch (Exception e){
e.printStackTrace();
}
return true;
}

/**
* 获取单条数据
* @param tableName 表名
* @param rowKey 唯一表标识
* @return 查询结果
*/
public static Result getRow(String tableName, String rowKey){
try (Table table = HBaseConnection.getTable(tableName)){
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
}catch (Exception e){
e.printStackTrace();
}
return null;
}

/**
* 根据过滤器来获取单条数据
* @param tableName 表名
* @param rowKey 唯一标识
* @param filterList 过滤器
* @return 查询结果
*/
public static Result getRow(String tableName, String rowKey, FilterList filterList){
try (Table table = HBaseConnection.getTable(tableName)){
Get get = new Get(Bytes.toBytes(rowKey));
get.setFilter(filterList);
return table.get(get);
}catch (Exception e){
e.printStackTrace();
}
return null;
}

/**
* 通过Scan来检索数据
* @param tableName 表名
* @return 查询结果
*/
public static ResultScanner getScanner(String tableName){
try (Table table = HBaseConnection.getTable(tableName)){
Scan scan = new Scan();
scan.setCaching(1000);
return table.getScanner(scan);
}catch (Exception e){
e.printStackTrace();
}
return null;
}

/**
* 批量检索数据
* @param tableName 表名
* @param startRowKey 起始rowKey
* @param endRowKey 终止rowKey
* @return 查询结果
*/
public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey){
try (Table table = HBaseConnection.getTable(tableName)){
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(endRowKey));
scan.setCaching(1000);
return table.getScanner(scan);
}catch (Exception e){
e.printStackTrace();
}
return null;
}

/**
* 使用过滤器批量检索数据
* @param tableName 表名
* @param startRowKey 起始rowKey
* @param endRowKey 终止rowKey
* @param filterList 过滤器
* @return 查询结果
*/
public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, FilterList filterList){
try (Table table = HBaseConnection.getTable(tableName)){
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(startRowKey));
scan.withStopRow(Bytes.toBytes(endRowKey));
scan.setFilter(filterList);
scan.setCaching(1000);
return table.getScanner(scan);
}catch (Exception e){
e.printStackTrace();
}
return null;
}

/**
* HBase删除一行结果
* @param tableName 表名
* @param rowKey 唯一标识
* @return 是否删除
*/
public static boolean deleteRow(String tableName, String rowKey){
try (Table table = HBaseConnection.getTable(tableName)){
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
}catch (Exception e){
e.printStackTrace();
}
return true;
}

/**
* 删除一个列族
* @param tableName 表名
* @param cfName 列族名
* @return 是否删除
*/
public static boolean deleteColumnFamily(String tableName, String cfName){
try (HBaseAdmin admin = (HBaseAdmin)HBaseConnection.getHBaseConnection().getAdmin()){
admin.deleteColumn(tableName, cfName);
}catch (Exception e){
e.printStackTrace();
}
return true;
}

/**
* 删除某一列的qualifier
* @param tableName 表名
* @param rowKey 唯一标识
* @param cfName 列族名
* @param qualifier
* @return
*/
public static boolean deleteQualifier(String tableName, String rowKey, String cfName, String qualifier){
try (Table table = HBaseConnection.getTable(tableName)){
Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier));
table.delete(delete);
}catch (Exception e){
e.printStackTrace();
}
return true;
}
}

2. 编写相关测试类进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* @title HBaseUtilTest
* @date 2019/12/9 11:55
* @description //todo
*/
public class HBaseUtilTest {

@Test
public void createTable(){
HBaseUtil.createTable("FileTable", new String[]{"fileInfo", "saveInfo"});
}

@Test
public void addFileDetails(){
HBaseUtil.putRow("FileTable", "rowkey1", "fileInfo", "name", "file1.txt");
HBaseUtil.putRow("FileTable", "rowkey1", "fileInfo", "type", "txt");
HBaseUtil.putRow("FileTable", "rowkey1", "fileInfo", "size", "1024");
HBaseUtil.putRow("FileTable", "rowkey1", "saveInfo", "creator", "suiwo1");
HBaseUtil.putRow("FileTable", "rowkey2", "fileInfo", "name", "file2.jpg");
HBaseUtil.putRow("FileTable", "rowkey2", "fileInfo", "type", "jpg");
HBaseUtil.putRow("FileTable", "rowkey2", "fileInfo", "size", "2048");
HBaseUtil.putRow("FileTable", "rowkey2", "saveInfo", "creator", "suiwo2");
}

@Test
public void getFileDetails(){
Result result = HBaseUtil.getRow("FileTable", "rowkey1");
if(result != null){
System.out.println("rowkey = " + Bytes.toString(result.getRow()));
System.out.println("fileName = " + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
}
}

@Test
public void scanFileDetail(){
ResultScanner scanner = HBaseUtil.getScanner("FileTable", "rowkey2", "rowkey2");
if(scanner != null){
scanner.forEach(result -> {
System.out.println("rowkey = " + Bytes.toString(result.getRow()));
System.out.println("fileName = " + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
});
scanner.close();
}
}

@Test
public void deleteRow(){
HBaseUtil.deleteRow("FileTable", "rowkey1");
}

@Test
public void deleteTable(){
HBaseUtil.deleteTable("FileTable");
}
}