spring程序使用三个方式在apache ignite存取数据
本文基于分布式内存数据库ignite的2.12.0版本,展示使用自动序列化、BinaryObjectBuilder、sql三种方式存取数据的demo
创建数据表 (cache)
准备集群,集群server节点需要使用四个端口:
10800 (JDBC/ODBC), 11211 (TCP connector), 47100 (listener), 47500 (discovery)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="test-grid-xx1-yy1"/>
<property name="userAttributes">
<map>
<entry key="rack" value="xx1"/>
</map>
</property>
<!-- Enable peer class loading. -->
<property name="peerClassLoadingEnabled" value="true"/>
<!-- Set deployment mode. -->
<property name="deploymentMode" value="ISOLATED"/>
<property name="authenticationEnabled" value="true"/>
<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>
<property name="sqlConfiguration">
<bean class="org.apache.ignite.configuration.SqlConfiguration">
<property name="sqlSchemas">
<list>
<value>DEV</value>
<value>SIT</value>
<value>UAT</value>
<value>VER</value>
</list>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<!-- Partitioned cache for Persons data. -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="test"/>
<property name="backups" value="1"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<!-- Group the cache belongs to. -->
<property name="groupName" value="DEV"/>
</bean>
</list>
</property>
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<!--
Default memory region that grows endlessly. A cache is bound to this memory region
unless it sets another one in its CacheConfiguration.
-->
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="name" value="Default_Region"/>
<property name="persistenceEnabled" value="true"/>
<!-- 100 MB memory region with disabled eviction -->
<property name="initialSize" value="#{100L * 1024 * 1024}"/>
<property name="maxSize" value="#{100L * 1024 * 1024 * 1024}"/>
<!-- Enabling SEGMENTED_LRU page replacement for this region. -->
<property name="pageReplacementMode" value="SEGMENTED_LRU"/>
<property name="metricsEnabled" value="true"/>
<property name="warmUpConfiguration">
<bean class="org.apache.ignite.configuration.LoadAllWarmUpConfiguration"/>
</property>
</bean>
</property>
</bean>
</property>
<!--
Explicitly configure TCP discovery SPI to provide list of
initial nodes from the first cluster.
-->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="localAddress" value="10.0.0.1"/>
<property name="localPort" value="47500"/>
<property name="localPortRange" value="20"/>
<property name="ackTimeout" value="#{3L * 1000}"/>
<property name="reconnectDelay" value="2000"/>
<property name="reconnectCount" value="5"/>
<property name="connectionRecoveryTimeout" value="#{60L * 1000}"/>
<!-- Setting up IP finder for this cluster -->
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="shared" value="true"/>
<property name="addresses">
<list>
<value>10.0.0.2:47500..47520</value>
<value>10.0.0.3:47500..47520</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<!--
Explicitly configure TCP communication SPI changing local
port number for the nodes from the first cluster.
-->
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="47100"/>
<property name="localPortRange" value="20"/>
<property name="connectTimeout" value="#{5L * 1000}"/>
<property name="reconnectCount" value="12"/>
<property name="idleConnectionTimeout" value="#{60L * 1000}"/>
<property name="usePairedConnections" value="true"/>
<property name="connectionsPerNode" value="3"/>
</bean>
</property>
</bean>
</beans>
登录到ignite,创建用户
./sqlline.sh --verbose=true -u 'jdbc:ignite:thin://10.0.0.1:10800,10.0.0.2:10800/SYS'
#默认管理账户是ignite:ignite,一般需要创建自定义用户
CREATE USER "wzp" WITH PASSWORD 'wzp';
#查看常用的运维信息:https://ignite.apache.org/docs/latest/monitoring-metrics/system-views
select * from SYS.NODES;
select * from SYS.CACHES;
select * from SYS.SCHEMAS;
select * from SYS.TABLES;
select * from SYS.TABLE_COLUMNS;
select * from SYS.BINARY_METADATA;
使用create table来创建cache,可以避免部署自定义类到server节点并支持使用二级索引;
table对应的cahe的名称为SQL_<SCHEMA_NAME>_<TABLE_NAME>,即:SQL_DEV_PERSON
CREATE TABLE IF NOT EXISTS DEV.PERSON (
id int,
city_id int,
name varchar,
age int,
company varchar,
PRIMARY KEY (id)
) WITH "TEMPLATE=PARTITIONED,CACHE_GROUP=DEV,BACKUPS=1,ATOMICITY=TRANSACTIONAL,VALUE_TYPE=com.wzp.ignite.bo.Person";
初始化spring bean
@Bean
public IgniteClient igniteClient() {
ClientConfiguration cfg = new ClientConfiguration();
cfg.setAddresses("10.0.0.1:10800", "10.0.0.2:10800").setUserName("wzp").setUserPassword("wzp");
cfg.setPartitionAwarenessEnabled(false);
cfg.setTransactionConfiguration(new ClientTransactionConfiguration().setDefaultTxTimeout(10000)
.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC)
.setDefaultTxIsolation(TransactionIsolation.REPEATABLE_READ));
return Ignition.startClient(cfg);
}
@Bean("igniteTransactionManager")
public IgniteClientSpringTransactionManager igniteTransactionManager(IgniteClient cli) {
IgniteClientSpringTransactionManager mgr = new IgniteClientSpringTransactionManager();
mgr.setClientInstance(cli);
return mgr;
}
使用自动序列化机制 (Key-Value API)
ignite实质是key-value数据库,会自动使用BinaryObject格式把value(POJO对象)进行序列化后存储
package com.wzp.ignite.bo;
@lombok.Data
public class Person implements java.io.Serializable {
private static final long serialVersionUID = 1L;
private int id;
private String name;
private int age;
private String company;
// 即使配置@QuerySqlField也无法支持自动序列化时的驼峰转下划线:cityId->city_id
private int city_id;
}
只要保证table定义里的VALUE_TYPE和POJO对象的类名一致,那么sql也能查询到使用Key-Value API存入的POJO数据。如果使用复合主键,那么需要设置好table定义里的KEY_TYPE。
@Autowired
IgniteClient ignite;
private void insertObject(int key) {
ClientCache<Integer, Person> cache = ignite.cache("SQL_" schemaName "_" "PERSON");
Person person = new Person();
person.setId(key);
person.setCity_id(1001);
person.setAge(1);
person.setName("Serializable");
person.setCompany("MIDEA");
cache.put(key, person);
}
private Person getObject(int key) {
ClientCache<Integer, Person> cache = ignite.cache("SQL_" schemaName "_" "PERSON");
return cache.get(key);
}
如需原子性地修改几行数据,在public方法上加上注解:
@Transactional(transactionManager = “igniteTransactionManager”)
使用BinaryObjectBuilder (Key-Value API)
如果程序里不存在Person类的定义,也可以使用BinaryObjectBuilder操作Table表数据;
builder的名称需要与table定义里的VALUE_TYPE一致:
private void insertBinaryObject(int key) {
ClientCache<Integer, BinaryObject> binaryCache = ignite.cache("SQL_" schemaName "_" "PERSON").withKeepBinary();
BinaryObjectBuilder builder = ignite.binary().builder("com.wzp.ignite.bo.Person");
builder.setField("id", key);
builder.setField("city_id", 1001);
builder.setField("name", "BinaryObjectBuilder");
builder.setField("age", 30);
builder.setField("company", "MIDEA");
binaryCache.put(key, builder.build());
}
private BinaryObject getBinaryObject(int key) {
ClientCache<Integer, BinaryObject> binaryCache = ignite.cache(cacheName).withKeepBinary();
return binaryCache.get(key);
}
/**
* 用key-value api操作使用复合主键的表
*/
private void insertBinaryObject(int userId, String month) {
ClientCache<BinaryObject, BinaryObject> binaryCache = ignite.cache("SQL_" schemaName "_" "SALLARY")
.withKeepBinary();
// 定义表时,指定KEY_TYPE
BinaryObjectBuilder keyBuilder = ignite.binary().builder("com.wzp.ignite.bo.SallaryKey");
keyBuilder.setField("user_id", userId);
keyBuilder.setField("month", month);
BinaryObjectBuilder valueBuilder = ignite.binary().builder("com.wzp.ignite.bo.SallaryValue");
valueBuilder.setField("salary_before_tax", 100000L);
valueBuilder.setField("tax", 3L);
binaryCache.put(keyBuilder.build(), valueBuilder.build());
}
使用SQL (SQL API)
瘦客户端模式下必须使用SQL API才能在服务端进行数据筛选,并利用到二级索引;
通过setSchema来切换schema
private void insertBySql(int key) {
ClientCache<Integer, Person> cache = ignite.cache(cacheName);
cache.query(new SqlFieldsQuery("INSERT INTO person(id, city_id, name, age, company) VALUES(?, ?, ?, ?, ?)")
.setArgs(key, 1001, "SQL", 30, "Midea").setSchema(schemaName)).getAll();
}
private Person selectBySql(int key) {
ClientCache<Integer, Person> cache = ignite.cache(cacheName);
try (FieldsQueryCursor<List<?>> cursor = cache
.query(new SqlFieldsQuery("select * from person where id = ?").setArgs(key)
.setSchema(schemaName))) {
for (List<?> row : cursor) {
Map<String,Object> map=new HashMap<>();
for (int i = 0; i < row.size(); i ) {
// 列名作为key
map.put(cursor.getFieldName(i), row.get(i));
}
// 使用jackson进行反序列化
ObjectMapper mapper = new ObjectMapper();
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
return mapper.convertValue(map, Person.class);
}
}
return null;
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfeeikj
系列文章
更多
同类精品
更多
-
解决Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources......
-
JavaWeb实现文件上传和下载
-
PageOffice在线编辑office文件和Apache POI的区别
-
Apache HTTP Server <2.4.56 mod_proxy 模块存在请求走私漏洞CVE-2023-25690
-
dolphinscheduler 3.0.1代码下载编译和部署
-
Exception in thread “main“ java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$PO
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13