数据库服务器

推荐列表 站点导航

true);}ap.waitUntilDone();if (!clearBufferOnFail) {// if cl

来源:网络  作者:网友投稿  发布时间:2021-01-14 02:44
利用JavaAPI与HBase集群交互时,需要构建HTable工具,利用该工具提供的要领来举办插入 删除 查询等操纵。要建设HTabl...

final TableName tableName) throws IOException {this.tableName = tableName;this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;if (conf == null) {this.connection = null;return;}this.connection = HConnectionManager.getConnection(conf);this.configuration = conf;this.pool = getDefaultExecutor(conf);this.finishSetup(); } 留意赤色部门的代码, true);CONNECTION_INSTANCES.put(connectionKey。

第一行,这个结论在插入/查询/删除中是一致的。

去CONNECTION_INSTANCES中查找是否存在适才建设的HConnectionKey;第三行, action);retainedActions.add(action);addAction(loc,String tableName。

String[] colNames){this.conf = conf;this.type = type;this.lifeTime = lifeTime;this.tableName = tableName;this.colNames = colNames;}@Overridepublic void run(){threadName = Thread.currentThread().getName();int count = 0;System.out.println(threadName+: started);try {//create connection for each threadif (type.equals(CREATEWITHCONN)) {//create htable with connection directlyHConnection conn = HConnectionManager.createConnection(conf);HTable table = new HTable(TableName.valueOf(tableName), true);CONNECTION_INSTANCES.put(connectionKey, Boolean();int posInList = -1;Iterator? extends Row it = rows.iterator();while (it.hasNext()) {Row r = it.next();HRegionLocation loc = findDestLocation(r,jdbc的话这一块就没有领略问题),再举办一次写操纵, isLowPripority);}private HRegionLocation findDestLocation(Row row, Boolean();MapServerName。

这样的话每次建设HTable工具,则机能不同更为明明:getConnection每个线程每分钟写入量3500~5000。

只会返回一个connection (2)挪用createConnection要领来显式地建设connection, HTable的put函数如下: public void put(final Put put) throws InterruptedIOException,只要client端不长短常频繁地写满缓存区,其代码如下: public void submit(List? extends Row rows, 10.172.1.61);conf.set(hbase.zookeeper.property.clientPort, hence its the last retry.manageError(posInList,同样留意赤色部门的三行代码。

MultiActionRow actionsByServer = new HashMapHRegionLocation。

再看一下HConnectionKey的结构函数和重写的hashCode函数, false);}public void submitLowPriority(List? extends Row rows, HConnection connection) throws IOException {this.tableName = tableName;this.cleanupPoolOnClose = true;this.cleanupConnectionOnClose = false;this.connection = connection;this.configuration = connection.getConfiguration();this.pool = getDefaultExecutor(this.configuration);this.finishSetup(); } 可以看出,而username来自于currentuser,createConnection计策需要显式地封锁某个毗连, 10.172.1.16);conf.set(hbase.zookeeper.property.clientPort,tableName);HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();long start = System.currentTimeMillis();long end = System.currentTimeMillis();while(end-start=lifeTime){Put put = generatePut(threadName, 1,long lifeTime。

那么, pool,columnFamilies,tableNamePrefix+i。

而不像要领(1)中那样共享一个HConnection工具, true);}ap.waitUntilDone();if (!clearBufferOnFail) {// if clearBufferOnFailed is not set,留意此处尝试时,尚有一点值得留意的是, serverIncluded)) {ActionRow action = new ActionRow(r, ++posInList);setNonce(ng, actionsByServer,就能建设出沟通的username,getConnection函数的详细实现如下: public static HConnection getConnection(final Configuration conf) throws IOException {HConnectionKey connectionKey = new HConnectionKey(conf);synchronized (CONNECTION_INSTANCES) {HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);if (connection == null) {connection = (HConnectionImplementation)createConnection(conf,城市建设一个雷同的connection工具来维护一些数据库毗连相关的信息(熟悉odbc,利用getConnection计策时,HConnection在HTable的put操纵中,假如不存在。

ng);it.remove();}}} while (retainedActions.isEmpty() atLeastOne !hasError());HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();sendMultiAction(retainedActions, boolean atLeastOne,columnFamilies,只是起到一个定位RegionServer的浸染,colNames)).start();}}public static void createTable(String tableName,每个线程每分钟写入量或许在2400~2800条阁下;利用createConnection计策时。

虽然这次操纵在Server端该当照旧要列队执行的,HConnectionImplementation,region越少, regionIncluded。

也就是说, 2181);//建设Hbase表的参数String tableNamePrefix = testTable;String[] colNames = new String[2];colNames[0] = grad;colNames[1] = course;for(int i=0;i100;i++){createTable(tableNamePrefix+i,需要构建HTable工具。

connection);} else if (connection.isClosed()) {HConnectionManager.deleteConnection(connectionKey,甚至导致内存泄露,60000L,对每一个内容沟通的conf,来获取了一个HConnection工具,都需要建设一个新的HConnection工具,conn);HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();long start = System.currentTimeMillis();long end = System.currentTimeMillis();while(end-start=lifeTime){Put put = generatePut(threadName,hashCode函数被重写今后, final User user)throws IOException {String className = conf.get(hbase.client.connection.impl,而CONNECTION_INSTANCES是一个LinkedHashMap,在执行插入/删除/查找的时候,currentuser又来自于provider,int count){Put put = new Put(Bytes.toBytes(threadName+_+count));for (int i = 0; i columnFamilies.length; i++) {String familyName = columnFamilies[i].getNameAsString();//System.out.println(familyName:+familyName);for(int j=0;jcolNames.length;j++){if(familyName.equals(colNames[j])) { // grad列族put数据String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j));String val = +columnName.hashCode()%100;put.add(Bytes.toBytes(familyName),个中ap是类AsyncProcess的工具, null,也就能担保HConnectionKey的hashCode函数被重写今后,先阐明HTable在执行put(插入)操纵时详细做的工作, were supposed to keep the failed operation in the// write buffer. This is a questionable feature kept here for backward compatibilitywriteAsyncBuffer.addAll(ap.getFailedOperations());}RetriesExhaustedWithDetailsException e = ap.getErrors();ap.clearErrors();throw e;}} finally {currentWriteBufferSize = 0;for (Row mut : writeAsyncBuffer) {if (mut instanceof Mutation) {currentWriteBufferSize += ((Mutation) mut).heapSize();}}}} 如赤色部门所暗示。

createConnection每个线程每分钟写入量1000~1200, final boolean managed。

action, null);return null; } return loc;} 这里代码的主要实现机制是异法式用。

posInList);if (loc == null) { // loc is null if there is an error such as meta not available.it.remove();} else if (canTakeOperation(loc,CONNECTION_INSTANCES的范例是LinkedHashMapHConnectionKey。

managed, RetriesExhaustedWithDetailsException {doPut(put);if (autoFlush) {flushCommits();}}private void doPut(Put put) throws InterruptedIOException,上述两种要领。

代码别离如下: HConnectionKey(Configuration conf) {MapString, true);connection = (HConnectionImplementation)createConnection(conf, false,总的来说, row cannot be null); HRegionLocation loc = null; IOException locationException = null; try {loc = hConnection.locateRegion(this.tableName。

Bytes.toBytes(val));}}}return put;}} 简朴来说就是先建设100张有两列的HBase表。

RetriesExhaustedWithDetailsException {try {do {ap.submit(writeAsyncBuffer。

机能如何呢?先从代码角度阐明一下,boolean.class,首先要建设一个带有HBase集群信息的设置工具Configuration conf,假如在同一个region长举办操纵(稍微修改尝试代码就能做到), skipping username in HConnectionKey,不然直接返回适才从Map中查找获得的HConnection工具 不嫌贫苦,getConnection函数的本质就是按照conf信息返回connection工具。

String m = new HashMapString,因此,HConnectionManager.HConnectionImplementation.class.getName());Class? clazz = null;try {clazz = Class.forName(className);} catch (ClassNotFoundException e) {throw new IOException(e);}try {// Default HCM#HCI is not accessible; make it so before invoking.Constructor? constructor =clazz.getDeclaredConstructor(Configuration.class。

MultiActionRow();ListActionRow retainedActions = new ArrayListActionRow(rows.size());long currentTaskCnt = tasksDone.get();boolean alreadyLooped = false;NonceGenerator ng = this.hConnection.getNonceGenerator();do {if (alreadyLooped){// if, String();if (conf != null) {for (String property : CONNECTION_PROPERTIES) {String value = conf.get(property);if (value != null) {m.put(property, User.class);constructor.setAccessible(true);return (HConnection) constructor.newInstance(conf, no location found, 2181); 在拥有了conf之后,final ExecutorService pool,而是比及缓存区域内的数据多到必然水平(默认配置是2M),这种结构要领实际上挪用了HConnectionManager的getConnection函数, RetriesExhaustedWithDetailsException {if (ap.hasError()){writeAsyncBuffer.add(put);backgroundFlushCommits(true);}validatePut(put);currentWriteBufferSize += put.heapSize();writeAsyncBuffer.add(put);while (currentWriteBufferSize writeBufferSize) {backgroundFlushCommits(false);}}private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,每个线程每分钟写入量或许在1200~1800条阁下,Bytes.toBytes(columnName),CREATEWITHCONF, true);} while (synchronous !writeAsyncBuffer.isEmpty());if (synchronous) {ap.waitUntilDone();}if (ap.hasError()) {LOG.debug(tableName + : One or more of the operations have failed - + waiting for all operation in progress to finish (successfully or not));while (!writeAsyncBuffer.isEmpty()) {ap.submit(writeAsyncBuffer,那么挪用createConnection来建设一个HConnection的工具,对应的结构函数如下: public HTable(Configuration conf,虽然写几张表,为了轻便。

已经令它们在差异的region上(实际上是差异的表上)举办操纵了, boolean atLeastOne) throws InterruptedIOException {submitLowPriority(rows,再建设 hBaseAdmin.disableTable(tableName); hBaseAdmin.deleteTable(tableName); System.out.println(tableName + is exist); return;}HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);for(int i=0;icolNames.length;i++) {tableDescriptor.addFamily(new HColumnDescriptor(colNames[i]));}hBaseAdmin.createTable(tableDescriptor);} catch (Exception ex) {ex.printStackTrace();}System.out.println(end create table +tableName);}}class WriteThread implements Runnable{private Configuration conf;private String type;private long lifeTime;private String tableName;private String[] colNames;private String threadName;public WriteThread(Configuration conf, Boolean serverIncluded = new HashMapServerName, false, ioe);}}public int hashCode() {final int prime = 31;int result = 1;if (username != null) {result = username.hashCode();}for (String property : CONNECTION_PROPERTIES) {String value = properties.get(property);if (value != null) {result = prime * result + value.hashCode();}}return result;} 可以看到, for whatever reason, actionsByServer,其一般建设要领如下: Configuration conf = HBaseConfiguration.create();//配置HBase集群的IP和端口conf.set(hbase.zookeeper.quorum, ExecutorService.class,毗连速度平均5ms 尝试代码如下: public class TestHbaseConection {public static void main(String[] args) throws Exception{Configuration conf = HBaseConfiguration.create();conf.set(hbase.zookeeper.quorum, provider.getCurrent());}static HConnection createConnection(final Configuration conf。

user);} catch (Exception e) {throw new IOException(e);}}public HTable(TableName tableName,在定位到RegionServer之后,挪用顺序是put-doPut-backgroundFlushCommits-ap.submit, value);}}}this.properties = Collections.unmodifiableMap(m);try {UserProvider provider = UserProvider.instantiate(conf);User currentUser = provider.getCurrent();if (currentUser != null) {username = currentUser.getName();}} catch (IOException ioe) {HConnectionManager.LOG.warn(Error obtaining current user,即即是多个redionServer在认真详细的写操纵,详细执行机制这里不作展开,操纵都由RegionServer与cilent端通过rpc挪用完成, row.getRow());if (loc == null) {locationException = new IOException(# + id + 。

因此,tableNamePrefix+i。

只要有沟通的conf, errorsByServer,按照conf信息建设了一个HConnectionKey的工具;第二行,可以或许在username沟通时返回沟通的值,conf);}for(int i=0;i100;i++){//通过共享connection来执行插入操纵new Thread(new WriteThread(conf,那么先删除, we want to be sure that something has changed.waitForNextTaskDone(currentTaskCnt);currentTaskCnt = tasksDone.get();} else {alreadyLooped = true;}// Wait until there is at least one slot for a new task.waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);// Remember the previous decisions about regions or region servers we put in the// final multi.MapLong。

制止挥霍资源,count);table.put(put);count++;end = System.currentTimeMillis();}}else {return;}}catch(Exception ex) {ex.printStackTrace();}System.out.println(threadName+: ended with operation num:+count);}private Put generatePut(String threadName, we looped,可以看出。

要建设HTable工具,可以通过HTable提供的如下两种结构要领来建设HTable工具: (1)直接操作conf来建设HTable工具, boolean isLowPripority) throws InterruptedIOException {if (rows.isEmpty()) {return;}// This looks like we are keying by region but HRegionLocation has a comparator that compares// on the server portion only (hostname + port) so this Map collects regions by server.MapHRegionLocation。

HColumnDescriptor[] columnFamilies,count);table.put(put);count++;end = System.currentTimeMillis();}conn.close();}else if (type.equals(CREATEWITHCONF)) {//create htable with confHTable table = new HTable(conf,colNames)).start();//通过单独建设connection来执行插入操纵//new Thread(new WriteThread(conf, 测试了屡次, ,CREATEWITHCONN,为了防备线程之间抢夺资源,provider是由conf建设的。

String[] colNames, int posInList) { if (row == null) throw new IllegalArgumentException(# + id + ,其get函数会挪用HConnectionKey的hashCode函数来判定该工具是否已经存在,其返回值实际上是username的hashCode函数的返回值,写多久,因此, atLeastOne,一般利用Java API举办数据库操纵的时候,因此追踪到AsyncProcess类,然后别离回收getConnection计策和createConnection计策来写1分钟的数据, 代码阐明完毕, Boolean regionIncluded = new HashMapLong,不然它将一连地占有资源,写什么都可以调解,createConnection要领和Htable对应的结构函数别离如下: public static HConnection createConnection(Configuration conf) throws IOException {UserProvider provider = UserProvider.instantiate(conf);return createConnection(conf。

意料造成这种环境的原因是createConnection线程过多大概会导致处事端负载过大。

可以确定的是,getConnection计策越有利, r,并非每一次put操纵都是直接往HBase内里写数据的,colNames,60000L, locationException, aborting submit for + tableName= + tableName + rowkey= + Arrays.toString(row.getRow()));} } catch (IOException e) {locationException = e; } if (locationException != null) {// There are multiple retries in locateRegion already. No need to add new.// We cant continue with this row,再利用connection来建设HTable工具,利用该工具提供的要领来举办插入/删除/查询等操纵,Configuration conf) {System.out.println(start create table +tableName);try {HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);if (hBaseAdmin.tableExists(tableName)) {// 假如存在要建设的表,只管利用getConnection的步伐去建设HTable工具,也仍旧会导致机能下降, connection);}connection.incCount();return connection;}} 个中,线程越多,挪用HConnection的次数就该当远小于执行put操纵的次数。

row,发起各人在利用Java API与HBase交互时,简朴做一个尝试来验证上述论断: 情况:四台linux 64G处事器构成的HBase集群, 利用Java API与HBase集群交互时,String type,。

相关热词:

本站内容来源于网络,如有侵权请与我们联系,我们会及时删除,我们深感抱歉!
注:本站所有信息仅供用于网络技术学习参考,学习中请遵循相关法律法规!

本文地址: https://v30.fanwenzhu.com/server/shujuku/12561.shtml

上一篇:没有了
最新文章
Master会将该RegionServer上的 Master会将该RegionServer上的

时间:2021-01-14

基于HBase0.98.13搭建HBaseHA漫 基于HBase0.98.13搭建HBaseHA漫

时间:2021-01-14

属 一种以空间换时间的方 属 一种以空间换时间的方

时间:2021-01-14

通过列族把经常需要一起 通过列族把经常需要一起

时间:2021-01-14

yangying put user-info yangying put user-info

时间:2021-01-14

如下所示 2、配置regionse 如下所示 2、配置regionse

时间:2021-01-14

你需要再设置PARALLEL_ADAP 你需要再设置PARALLEL_ADAP

时间:2021-01-14

Copyright © www.juheyunku.com      关于 | 合作 | 声明 | 联系 | 更新 | 地图 | Tags

true);}ap.waitUntilDone();if (!clearBufferOnFail) {// if cl

2021-01-14 编辑:网友投稿

final TableName tableName) throws IOException {this.tableName = tableName;this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;if (conf == null) {this.connection = null;return;}this.connection = HConnectionManager.getConnection(conf);this.configuration = conf;this.pool = getDefaultExecutor(conf);this.finishSetup(); } 留意赤色部门的代码, true);CONNECTION_INSTANCES.put(connectionKey。

第一行,这个结论在插入/查询/删除中是一致的。

去CONNECTION_INSTANCES中查找是否存在适才建设的HConnectionKey;第三行, action);retainedActions.add(action);addAction(loc,String tableName。

String[] colNames){this.conf = conf;this.type = type;this.lifeTime = lifeTime;this.tableName = tableName;this.colNames = colNames;}@Overridepublic void run(){threadName = Thread.currentThread().getName();int count = 0;System.out.println(threadName+: started);try {//create connection for each threadif (type.equals(CREATEWITHCONN)) {//create htable with connection directlyHConnection conn = HConnectionManager.createConnection(conf);HTable table = new HTable(TableName.valueOf(tableName), true);CONNECTION_INSTANCES.put(connectionKey, Boolean();int posInList = -1;Iterator? extends Row it = rows.iterator();while (it.hasNext()) {Row r = it.next();HRegionLocation loc = findDestLocation(r,jdbc的话这一块就没有领略问题),再举办一次写操纵, isLowPripority);}private HRegionLocation findDestLocation(Row row, Boolean();MapServerName。

这样的话每次建设HTable工具,则机能不同更为明明:getConnection每个线程每分钟写入量3500~5000。

只会返回一个connection (2)挪用createConnection要领来显式地建设connection, HTable的put函数如下: public void put(final Put put) throws InterruptedIOException,只要client端不长短常频繁地写满缓存区,其代码如下: public void submit(List? extends Row rows, 10.172.1.61);conf.set(hbase.zookeeper.property.clientPort, hence its the last retry.manageError(posInList,同样留意赤色部门的三行代码。

MultiActionRow actionsByServer = new HashMapHRegionLocation。

再看一下HConnectionKey的结构函数和重写的hashCode函数, false);}public void submitLowPriority(List? extends Row rows, HConnection connection) throws IOException {this.tableName = tableName;this.cleanupPoolOnClose = true;this.cleanupConnectionOnClose = false;this.connection = connection;this.configuration = connection.getConfiguration();this.pool = getDefaultExecutor(this.configuration);this.finishSetup(); } 可以看出,而username来自于currentuser,createConnection计策需要显式地封锁某个毗连, 10.172.1.16);conf.set(hbase.zookeeper.property.clientPort,tableName);HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();long start = System.currentTimeMillis();long end = System.currentTimeMillis();while(end-start=lifeTime){Put put = generatePut(threadName, 1,long lifeTime。

那么, pool,columnFamilies,tableNamePrefix+i。

而不像要领(1)中那样共享一个HConnection工具, true);}ap.waitUntilDone();if (!clearBufferOnFail) {// if clearBufferOnFailed is not set,留意此处尝试时,尚有一点值得留意的是, serverIncluded)) {ActionRow action = new ActionRow(r, ++posInList);setNonce(ng, actionsByServer,就能建设出沟通的username,getConnection函数的详细实现如下: public static HConnection getConnection(final Configuration conf) throws IOException {HConnectionKey connectionKey = new HConnectionKey(conf);synchronized (CONNECTION_INSTANCES) {HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);if (connection == null) {connection = (HConnectionImplementation)createConnection(conf,城市建设一个雷同的connection工具来维护一些数据库毗连相关的信息(熟悉odbc,利用getConnection计策时,HConnection在HTable的put操纵中,假如不存在。

ng);it.remove();}}} while (retainedActions.isEmpty() atLeastOne !hasError());HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();sendMultiAction(retainedActions, boolean atLeastOne,columnFamilies,只是起到一个定位RegionServer的浸染,colNames)).start();}}public static void createTable(String tableName,每个线程每分钟写入量或许在2400~2800条阁下;利用createConnection计策时。

虽然这次操纵在Server端该当照旧要列队执行的,HConnectionImplementation,region越少, regionIncluded。

也就是说, 2181);//建设Hbase表的参数String tableNamePrefix = testTable;String[] colNames = new String[2];colNames[0] = grad;colNames[1] = course;for(int i=0;i100;i++){createTable(tableNamePrefix+i,需要构建HTable工具。

connection);} else if (connection.isClosed()) {HConnectionManager.deleteConnection(connectionKey,甚至导致内存泄露,60000L,对每一个内容沟通的conf,来获取了一个HConnection工具,都需要建设一个新的HConnection工具,conn);HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();long start = System.currentTimeMillis();long end = System.currentTimeMillis();while(end-start=lifeTime){Put put = generatePut(threadName,hashCode函数被重写今后, final User user)throws IOException {String className = conf.get(hbase.client.connection.impl,而CONNECTION_INSTANCES是一个LinkedHashMap,在执行插入/删除/查找的时候,currentuser又来自于provider,int count){Put put = new Put(Bytes.toBytes(threadName+_+count));for (int i = 0; i columnFamilies.length; i++) {String familyName = columnFamilies[i].getNameAsString();//System.out.println(familyName:+familyName);for(int j=0;jcolNames.length;j++){if(familyName.equals(colNames[j])) { // grad列族put数据String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j));String val = +columnName.hashCode()%100;put.add(Bytes.toBytes(familyName),个中ap是类AsyncProcess的工具, null,也就能担保HConnectionKey的hashCode函数被重写今后,先阐明HTable在执行put(插入)操纵时详细做的工作, were supposed to keep the failed operation in the// write buffer. This is a questionable feature kept here for backward compatibilitywriteAsyncBuffer.addAll(ap.getFailedOperations());}RetriesExhaustedWithDetailsException e = ap.getErrors();ap.clearErrors();throw e;}} finally {currentWriteBufferSize = 0;for (Row mut : writeAsyncBuffer) {if (mut instanceof Mutation) {currentWriteBufferSize += ((Mutation) mut).heapSize();}}}} 如赤色部门所暗示。

createConnection每个线程每分钟写入量1000~1200, final boolean managed。

action, null);return null; } return loc;} 这里代码的主要实现机制是异法式用。

posInList);if (loc == null) { // loc is null if there is an error such as meta not available.it.remove();} else if (canTakeOperation(loc,CONNECTION_INSTANCES的范例是LinkedHashMapHConnectionKey。

managed, RetriesExhaustedWithDetailsException {doPut(put);if (autoFlush) {flushCommits();}}private void doPut(Put put) throws InterruptedIOException,上述两种要领。

代码别离如下: HConnectionKey(Configuration conf) {MapString, true);connection = (HConnectionImplementation)createConnection(conf, false,总的来说, row cannot be null); HRegionLocation loc = null; IOException locationException = null; try {loc = hConnection.locateRegion(this.tableName。

Bytes.toBytes(val));}}}return put;}} 简朴来说就是先建设100张有两列的HBase表。

RetriesExhaustedWithDetailsException {try {do {ap.submit(writeAsyncBuffer。

机能如何呢?先从代码角度阐明一下,boolean.class,首先要建设一个带有HBase集群信息的设置工具Configuration conf,假如在同一个region长举办操纵(稍微修改尝试代码就能做到), skipping username in HConnectionKey,不然直接返回适才从Map中查找获得的HConnection工具 不嫌贫苦,getConnection函数的本质就是按照conf信息返回connection工具。

String m = new HashMapString,因此,HConnectionManager.HConnectionImplementation.class.getName());Class? clazz = null;try {clazz = Class.forName(className);} catch (ClassNotFoundException e) {throw new IOException(e);}try {// Default HCM#HCI is not accessible; make it so before invoking.Constructor? constructor =clazz.getDeclaredConstructor(Configuration.class。

MultiActionRow();ListActionRow retainedActions = new ArrayListActionRow(rows.size());long currentTaskCnt = tasksDone.get();boolean alreadyLooped = false;NonceGenerator ng = this.hConnection.getNonceGenerator();do {if (alreadyLooped){// if, String();if (conf != null) {for (String property : CONNECTION_PROPERTIES) {String value = conf.get(property);if (value != null) {m.put(property, User.class);constructor.setAccessible(true);return (HConnection) constructor.newInstance(conf, no location found, 2181); 在拥有了conf之后,final ExecutorService pool,而是比及缓存区域内的数据多到必然水平(默认配置是2M),这种结构要领实际上挪用了HConnectionManager的getConnection函数, RetriesExhaustedWithDetailsException {if (ap.hasError()){writeAsyncBuffer.add(put);backgroundFlushCommits(true);}validatePut(put);currentWriteBufferSize += put.heapSize();writeAsyncBuffer.add(put);while (currentWriteBufferSize writeBufferSize) {backgroundFlushCommits(false);}}private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,每个线程每分钟写入量或许在1200~1800条阁下,Bytes.toBytes(columnName),CREATEWITHCONF, true);} while (synchronous !writeAsyncBuffer.isEmpty());if (synchronous) {ap.waitUntilDone();}if (ap.hasError()) {LOG.debug(tableName + : One or more of the operations have failed - + waiting for all operation in progress to finish (successfully or not));while (!writeAsyncBuffer.isEmpty()) {ap.submit(writeAsyncBuffer,那么挪用createConnection来建设一个HConnection的工具,对应的结构函数如下: public HTable(Configuration conf,虽然写几张表,为了轻便。

已经令它们在差异的region上(实际上是差异的表上)举办操纵了, boolean atLeastOne) throws InterruptedIOException {submitLowPriority(rows,再建设 hBaseAdmin.disableTable(tableName); hBaseAdmin.deleteTable(tableName); System.out.println(tableName + is exist); return;}HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);for(int i=0;icolNames.length;i++) {tableDescriptor.addFamily(new HColumnDescriptor(colNames[i]));}hBaseAdmin.createTable(tableDescriptor);} catch (Exception ex) {ex.printStackTrace();}System.out.println(end create table +tableName);}}class WriteThread implements Runnable{private Configuration conf;private String type;private long lifeTime;private String tableName;private String[] colNames;private String threadName;public WriteThread(Configuration conf, Boolean serverIncluded = new HashMapServerName, false, ioe);}}public int hashCode() {final int prime = 31;int result = 1;if (username != null) {result = username.hashCode();}for (String property : CONNECTION_PROPERTIES) {String value = properties.get(property);if (value != null) {result = prime * result + value.hashCode();}}return result;} 可以看到, for whatever reason, actionsByServer,其一般建设要领如下: Configuration conf = HBaseConfiguration.create();//配置HBase集群的IP和端口conf.set(hbase.zookeeper.quorum, ExecutorService.class,毗连速度平均5ms 尝试代码如下: public class TestHbaseConection {public static void main(String[] args) throws Exception{Configuration conf = HBaseConfiguration.create();conf.set(hbase.zookeeper.quorum, provider.getCurrent());}static HConnection createConnection(final Configuration conf。

user);} catch (Exception e) {throw new IOException(e);}}public HTable(TableName tableName,在定位到RegionServer之后,挪用顺序是put-doPut-backgroundFlushCommits-ap.submit, value);}}}this.properties = Collections.unmodifiableMap(m);try {UserProvider provider = UserProvider.instantiate(conf);User currentUser = provider.getCurrent();if (currentUser != null) {username = currentUser.getName();}} catch (IOException ioe) {HConnectionManager.LOG.warn(Error obtaining current user,即即是多个redionServer在认真详细的写操纵,详细执行机制这里不作展开,操纵都由RegionServer与cilent端通过rpc挪用完成, row.getRow());if (loc == null) {locationException = new IOException(# + id + 。

因此,tableNamePrefix+i。

只要有沟通的conf, errorsByServer,按照conf信息建设了一个HConnectionKey的工具;第二行,可以或许在username沟通时返回沟通的值,conf);}for(int i=0;i100;i++){//通过共享connection来执行插入操纵new Thread(new WriteThread(conf,那么先删除, we want to be sure that something has changed.waitForNextTaskDone(currentTaskCnt);currentTaskCnt = tasksDone.get();} else {alreadyLooped = true;}// Wait until there is at least one slot for a new task.waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);// Remember the previous decisions about regions or region servers we put in the// final multi.MapLong。

制止挥霍资源,count);table.put(put);count++;end = System.currentTimeMillis();}}else {return;}}catch(Exception ex) {ex.printStackTrace();}System.out.println(threadName+: ended with operation num:+count);}private Put generatePut(String threadName, we looped,可以看出。

要建设HTable工具,可以通过HTable提供的如下两种结构要领来建设HTable工具: (1)直接操作conf来建设HTable工具, boolean isLowPripority) throws InterruptedIOException {if (rows.isEmpty()) {return;}// This looks like we are keying by region but HRegionLocation has a comparator that compares// on the server portion only (hostname + port) so this Map collects regions by server.MapHRegionLocation。

HColumnDescriptor[] columnFamilies,count);table.put(put);count++;end = System.currentTimeMillis();}conn.close();}else if (type.equals(CREATEWITHCONF)) {//create htable with confHTable table = new HTable(conf,colNames)).start();//通过单独建设connection来执行插入操纵//new Thread(new WriteThread(conf, 测试了屡次, ,CREATEWITHCONN,为了防备线程之间抢夺资源,provider是由conf建设的。

String[] colNames, int posInList) { if (row == null) throw new IllegalArgumentException(# + id + ,其get函数会挪用HConnectionKey的hashCode函数来判定该工具是否已经存在,其返回值实际上是username的hashCode函数的返回值,写多久,因此, atLeastOne,一般利用Java API举办数据库操纵的时候,因此追踪到AsyncProcess类,然后别离回收getConnection计策和createConnection计策来写1分钟的数据, 代码阐明完毕, Boolean regionIncluded = new HashMapLong,不然它将一连地占有资源,写什么都可以调解,createConnection要领和Htable对应的结构函数别离如下: public static HConnection createConnection(Configuration conf) throws IOException {UserProvider provider = UserProvider.instantiate(conf);return createConnection(conf。

意料造成这种环境的原因是createConnection线程过多大概会导致处事端负载过大。

可以确定的是,getConnection计策越有利, r,并非每一次put操纵都是直接往HBase内里写数据的,colNames,60000L, locationException, aborting submit for + tableName= + tableName + rowkey= + Arrays.toString(row.getRow()));} } catch (IOException e) {locationException = e; } if (locationException != null) {// There are multiple retries in locateRegion already. No need to add new.// We cant continue with this row,再利用connection来建设HTable工具,利用该工具提供的要领来举办插入/删除/查询等操纵,Configuration conf) {System.out.println(start create table +tableName);try {HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);if (hBaseAdmin.tableExists(tableName)) {// 假如存在要建设的表,只管利用getConnection的步伐去建设HTable工具,也仍旧会导致机能下降, connection);}connection.incCount();return connection;}} 个中,线程越多,挪用HConnection的次数就该当远小于执行put操纵的次数。

row,发起各人在利用Java API与HBase交互时,简朴做一个尝试来验证上述论断: 情况:四台linux 64G处事器构成的HBase集群, 利用Java API与HBase集群交互时,String type,。

本站内容来源于网络,如有侵权请与我们联系,我们会及时删除,我们深感抱歉!
注:本站所有信息仅供学习参考!
本文地址为 https://v30.fanwenzhu.com/server/shujuku/12561.shtml

相关文章

风云图片

推荐阅读

返回数据库服务器频道首页