标签:Apache-Ignite-1.4.0-中文开发手册

[TOC]

3.数据网格

3.1.数据网格

Ignite针对越来越火的水平扩展概念而构建,具有实时按需增加节点的能力。他可以支持线性扩展到几百个节点,通过数据位置的强语义以及数据关系路由来降低冗余数据噪声。 Ignite数据网格是一个基于内存的分布式键值存储,他可以视为一个分布式的分区化哈希,每个集群节点都持有所有数据的一部分,这意味着随着集群节点的增加,就可以缓存更多的数据。 与其他键值存储系统不同,Ignite通过可插拔的哈希算法来决定数据的位置,每个客户端都可以通过一个加入一个哈希函数决定一个键属于哪个节点,而不需要任何特定的映射服务器或者name节点。 Ignite数据网格支持本地、复制的、分区化的数据集,允许使用标准SQL语法方便地进行跨数据集查询,同时还支持在内存数据中进行分布式SQL关联。 Ignite数据网格轻量快速,是目前在集群中支持数据的事务性和原子性的最快的实现之一。

数据一致性 只要集群仍然处于活动状态,即使节点崩溃或者网络拓扑发生变化,Ignite也会保证不同集群节点中的数据的一致性。

JCache (JSR107) Ignite实现了JCache(JSR107)规范。 特性一览

  • 内存内的分布式缓存
  • 轻量级高性能
  • 弹性扩展
  • 内存内分布式事务
  • Web Session集群化
  • Hibernate 2级缓存
  • 分层堆外存储
  • 支持关联的分布式ANSI-99 SQL查询

3.1.1.IgniteCache

IgniteCache接口是Ignite缓存实现的入口,提供了保存和获取数据,执行查询等等。 JCACHE IgniteCache接口从JCache规范继承了javax.cache.Cache接口,然后又加入了其他的功能,主要是与本地和分布式的操作、查询、度量等等。 如下的代码显示了如何获得IgniteCache的实例:

Ignite ignite = Ignition.ignite();
// Obtain instance of cache named "myCache".
// Note that different caches may have different generics.
IgniteCache<Integer, String> cache = ignite.cache("myCache");

也可以动态地创建缓存的实例,这时Ignite会在所有的集群节点上创建和部署缓存。

Ignite ignite = Ignition.ignite();

CacheConfiguration cfg = new CacheConfiguration();

cfg.setName("myCache");
cfg.setAtomicityMode(TRANSACTIONAL);

// Create cache with given name, if it does not exist.
IgniteCache<Integer, String> cache = ignite.getOrCreateCache(cfg);

XML配置: 在任意集群节点通过Ignite Spring XML配置定义的缓存会自动创建和部署在所有的集群节点上(不需要在每一个节点上指定相同的配置)。

3.2.超越JCache

Ignite是JCache(JSR107)规范的一个实现,JCache为数据访问提供了简单易用且功能强大的API,然而规范忽略了任何有关数据分布以及一致性的细节来允许开发商在自己的实现中有足够的自由度。 可以通过JCache实现:

  • 基本缓存操作
  • ConcurrentMap API
  • 并行处理(EntryProcessor)
  • 事件和度量
  • 可插拔的持久化

在JCache之外,Ignite还提供了ACID事务,数据查询的能力(包括SQL),各种内存模型等。

3.2.1.IgniteCache

IgniteCache是基于JCache(JSR107)的,所以在非常基本的API上可以减少到javax.cache.Cache接口,然而IgniteCache还提供了JCache规范之外的,有用的功能,比如数据加载,查询,异步模型等。 可以从Ignite中直接获得IgniteCache的实例:

Ignite ignite = Ignition.ignite();

IgniteCache cache = ignite.cache("mycache");

3.2.2.基本操作

下面是一些JCache基本原子操作的例子:

try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) {
    IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);

    // Store keys in cache (values will end up on different cache nodes).
    for (int i = 0; i < 10; i++)
        cache.put(i, Integer.toString(i));

    for (int i = 0; i < 10; i++)
        System.out.println("Got [key=" + i + ", val=" + cache.get(i) + ']');
}

3.2.3.EntryProcessor

当在缓存中执行putsupdates操作时,通常需要在网络中发送完整的状态数据,而EntryProcessor可以直接在主节点上处理数据,只需要传输增量数据而不是全量数据。 此外,可以在EntryProcessor中嵌入自定义逻辑,比如,获取之前缓存的数据然后加1. Java8:

IgniteCache<String, Integer> cache = ignite.cache("mycache");

// Increment cache value 10 times.
for (int i = 0; i < 10; i++)
  cache.invoke("mykey", (entry, args) -> {
    Integer val = entry.getValue();

    entry.setValue(val == null ? 1 : val + 1);

    return null;
  });

Java7:

IgniteCache<String, Integer> cache = ignite.jcache("mycache");

// Increment cache value 10 times.
for (int i = 0; i < 10; i++)
  cache.invoke("mykey", new EntryProcessor<String, Integer, Void>() {
    @Override 
    public Object process(MutableEntry<Integer, String> entry, Object... args) {
      Integer val = entry.getValue();

      entry.setValue(val == null ? 1 : val + 1);

      return null;
    }
  });

原子性EntryProcessor通过给键加锁以原子性方式执行。

3.2.4.异步支持

和Ignite中的所有API一样,IgniteCache实现了IgniteAsynchronousSupport接口,因此可以以异步的方式使用。

// Enable asynchronous mode.
IgniteCache<String, Integer> asyncCache = ignite.cache("mycache").withAsync();

// Asynhronously store value in cache.
asyncCache.getAndPut("1", 1);

// Get future for the above invocation.
IgniteFuture<Integer> fut = asyncCache.future();

// Asynchronously listen for the operation to complete.
fut.listenAsync(f -> System.out.println("Previous cache value: " + f.get()));

3.3.缓存模式

Ignite提供了三种不同的缓存操作模式,分区、复制和本地。缓存模型可以为每个缓存单独配置,缓存模型是通过CacheMode枚举定义的。

3.3.1.分区模式

分区模式是扩展性最好的分布式缓存模式,这种模式下,所有数据被均等地分布在分区中,所有的分区也被均等地拆分在相关的节点中,实际上就是为缓存的数据创建了一个巨大的内存内分布式存储。这个方式可以在所有节点上只要匹配总可用内存就可以存储尽可能多的数据,因此,可以在集群的所有节点的内存中可以存储TB级的数据,也就是说,只要有足够多的节点,就可以存储足够多的数据。 与复制模式不同,它更新是很昂贵的,因为集群内的每个节点都需要更新,而分区模式更新就很廉价,因为对于每个键只需要更新一个主节点(可选择一个或者多个备份节点),然而,读取变得较为昂贵,因为只有特定节点才持有缓存的数据。 为了避免额外的数据移动,总是访问恰好缓存有要访问的数据的节点是很重要的,这个方法叫做关系并置,当工作在分区化缓存时强烈建议使用。

分区化缓存适合于数据量很大而更新频繁的场合。

下面的配置章节显示了如何配置缓存模式的例子。

3.3.2.复制模式

复制模式中,所有数据都被复制到集群内的每个节点,因为每个节点都有效所以这个缓存模式提供了最大的数据可用性。然而,这个模式每个数据更新都要传播到其他所有节点,因而会对性能和可扩展性产生影响。 Ignite中,复制缓存是通过分区缓存实现的,每个键都有一个主拷贝而且在集群内的其他节点也会有备份。 因为相同的数据被存储在所有的集群节点中,复制缓存的大小受到RAM最小的节点的有效内存限制。这个模式适用于读缓存比写缓存频繁的多而且数据集较小的场景,如果应用超过80%的时间用于查找缓存,那么就要考虑使用复制缓存模式了。

复制缓存适用于数据集不大而且更新不频繁的场合。

3.3.3.本地模式

本地模式是最轻量的模式,因为没有数据被分布化到其他节点。他适用于或者数据是只读的,或者需要定期刷新的场景中。当缓存数据失效需要从持久化存储中加载数据时,他也可以工作与通读模式。除了分布化以外,本地缓存包括了分布式缓存的所有功能,比如自动数据回收,过期,磁盘交换,数据查询以及事务。

3.3.4.配置

缓存可以每个缓存分别配置,通过设置CacheConfigurationcacheMode属性实现: XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="cacheName"/>

            <!-- Set cache mode. -->
            <property name="cacheMode" value="PARTITIONED"/>
            ... 
        </bean
    </property>
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration("myCache");

cacheCfg.setCacheMode(CacheMode.PARTITIONED);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.3.5.原子有序写模式

当分区缓存使用CacheAtomicityMode.ATOMIC模式时,可以配置成原子有序写模式,原子有序写决定哪个节点会赋予写版本(发送者或者主节点),它由CacheAtomicWriteOrderMode枚举定义,它有两种模式:CLOCKPRIMARYCLOCK有序写模式中,写版本被赋予在一个发送者节点上,当使用CacheWriteSynchronizationMode.FULL_SYNCCLOCK模式会被自动开启,因为它性能更好,因为到主节点和备份节点的写请求是被同时发送的。 PRIMARY有序写模式中,写版本只被赋予到主节点上,这种模式下发送者只会将写请求发送到主节点上然后分配写版本再转发到备份节点上。 原子有序写模式可以通过CacheConfigurationatomicWriteOrderMode属性进行配置。 XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="cacheName"/>

            <!-- Atomic write order mode. -->
            <property name="atomicWriteOrderMode" value="PRIMARY"/>
            ... 
        </bean
    </property>
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

cacheCfg.setAtomicWriteOrderMode(CacheAtomicWriteOrderMode.CLOCK);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

要了解有关原子模式的更多信息,请参照:3.9.事务章节。

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="cacheName"/>

            <!-- Set cache mode. -->
            <property name="cacheMode" value="PARTITIONED"/>

            <!-- Number of backup nodes. -->
            <property name="backups" value="1"/>
            ... 
        </bean
    </property>
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

cacheCfg.setCacheMode(CacheMode.PARTITIONED);

cacheCfg.setBackups(1);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.4.主节点和备份副本

分区模式下,赋予键的节点叫做这些键的主节点,对于缓存的数据,也可以有选择地配置任意多个备份节点。如果副本数量大于0,那么Ignite会自动地为每个独立的键赋予备份节点,比如,如果副本数量为1,那么数据网格内缓存的每个键都会有2个备份,一主一备。

默认,因为性能原因备份是被关闭的。

备份可以通过CacheConfigurationbackups属性进行配置,像下面这样: XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="cacheName"/>

            <!-- Set cache mode. -->
            <property name="cacheMode" value="PARTITIONED"/>

            <!-- Number of backup nodes. -->
            <property name="backups" value="1"/>
            ... 
        </bean
    </property>
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

cacheCfg.setCacheMode(CacheMode.PARTITIONED);

cacheCfg.setBackups(1);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.4.1.同步和异步备份

CacheWriteSynchronizationMode枚举可以用来配置主节点和备份部分的同步和异步更新。同步写模式告诉Ignite在完成写或者提交之前客户端节点是否要等待来自远程节点的响应。 同步写模式可以设置为下面的三种之一:

同步写模式 描述
FULL_SYNC 客户端节点要等待所有相关远程节点的写入或者提交完成(主和备)。
FULL_ASYNC 默认,这种情况下,客户端不需要等待来自相关节点的响应。这时远程节点会在获得他们的状态在任意的缓存写操作完成或者Transaction.commit()方法调用完成之后进行小幅更新。
PRIMARY_SYNC 客户端节点会等待主节点的写或者提交操纵完成,但不会等待备份节点的更新完成。

缓存数据一致性 注意不管那种写同步模式,缓存数据都会保持在所有相关节点上的完整一致性。

写同步模式可以通过CacheConfigurationwriteSynchronizationMode属性进行配置,像下面这样: XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="cacheName"/>

            <!-- Set write synchronization mode. -->
            <property name="writeSynchronizationMode" value="FULL_SYNC"/>       
            ... 
        </bean
    </property>
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.5.近缓存

分区化的缓存也可以通过缓存前移,他是一个较小的本地缓存,可以用来存储最近或者最频繁访问的数据。和分区缓存一样,可以控制近缓存的大小以及回收策略。 近缓存可以通过在Ignite.createNearCache(NearConfiguration)中传入NearConfiguration或者通过调用Ignite.getOrCreateNearCache(NearConfiguration)方法在客户端节点直接创建。 Java:

// Create distributed cache on the server nodes, called "myCache".
ignite.getOrCreateCache(new CacheConfiguration<MyKey, MyValue>("myCache"));

// Create near-cache configuration for "myCache".
NearCacheConfiguration<MyKey, MyValue> nearCfg = new NearCacheConfiguration<>();

// Use LRU eviction policy to automatically evict entries
// from near-cache, whenever it reaches 100_000 in size.
nearCfg.setEvictionPolicy(new LruEvictionPolicy<>(100_000));

// Create near-cache for "myCache".
IgniteCache<MyKey, MyValue> cache = ignite.getOrCreateNearCache("myCache", nearCfg);

在大多数情况下,只要用了Ignite的关系并置,近缓存就不应该用了。如果计算与相应的分区化缓存节点是并置的,那么近缓存根本就不需要了,因为所有数据只在分区化缓存的本地才有效。 然而,有时没有必要将计算任务发送给远端节点,比如近缓存可以显著提升可扩展性或者提升应用的整体性能。

服务端节点的近缓存: 少数情况下,每当以非托管模式从服务器端的分区缓存中访问数据时,都需要通过CacheConfiguration.setNearConfiguration(...)方法在服务端节点上配置近缓存。

3.5.1.配置

下面是NearCacheConfiguration的配置参数:

setter方法 描述 默认值
setNearEvictionPolicy(CacheEvictionPolicy) 近缓存回收策略
setNearStartSize(int) 缓存初始大小 375000

3.6.缓存查询

Ignite提供了非常简洁的查询API,支持:

  • 基于谓词的扫描查询
  • SQL查询
  • 文本查询 对于SQL查询,Ignite提供了内存内的索引,因此所有的数据检索都是非常快的,如果是在堆外内存中缓存数据的,那么查询索引也会缓存在堆外内存中。 Ignite也通过IndexingSpiSpiQuery类提供对自定义索引的支持。

3.6.1.主要的抽象

IgniteCache有若干个查询方法,这些方法可以获得一些Query的子类以及返回QueryCursor

查询: Query抽象类表示一个在分布式缓存上执行的抽象分页查询。可以通过Query.setPageSize(...)方法设置返回游标的每页大小,默认值是1024

查询游标: QueryCursor表示查询的结果集,可以透明地进行一页一页地迭代。每当迭代到每页的最后时,会自动地在后台请求下一页的数据,当不需要分页时,可以使用QueryCursor.getAll()方法,他会获得整个查询结果集然后存储在集合里。

关闭游标: 如果调用了QueryCursor.getAll()方法,游标会自动关闭。如果通过for循环迭代一个游标或者显式地获得Iterator,必须显式地关闭或者使用AutoCloseable语法。

3.6.2.扫描查询

扫描查询可以通过用户定义的谓词以分布式的形式进行缓存的查询。 Java8:

IgniteCache<Long, Person> cache = ignite.cache("mycache");

// Find only persons earning more than 1,000.
try (QueryCursor cursor = cache.query(new ScanQuery((k, p) -> p.getSalary() > 1000)) {
  for (Person p : cursor)
    System.out.println(p.toString());
}

Java7:

IgniteCache<Long, Person> cache = ignite.cache("mycache");

// Find only persons earning more than 1,000.
IgniteBiPredicate<Long, Person> filter = new IgniteByPredicate<>() {
  @Override public boolean apply(Long key, Perons p) {
    return p.getSalary() > 1000;
  }
};

try (QueryCursor cursor = cache.query(new ScanQuery(filter)) {
  for (Person p : cursor)
    System.out.println(p.toString());
}

3.6.3.SQL查询

Ignite的SQL查询请参照SQL查询

3.6.4.文本查询

Ignite也支持通过Lucene索引实现的基于文本的查询。 文本查询:

IgniteCache<Long, Person> cache = ignite.cache("mycache");

// Query for all people with "Master Degree" in their resumes.
TextQuery txt = new TextQuery(Person.class, "Master Degree");

try (QueryCursor<Entry<Long, Person>> masters = cache.query(txt)) {
  for (Entry<Long, Person> e : cursor)
    System.out.println(e.getValue().toString());
}

3.6.5.通过注解进行查询的配置

索引可以在代码中通过@QuerySqlField注解进行配置,来告诉Ignite那个类型要被索引,键值对可以传入CacheConfiguration.setIndexedTypes(MyKey.class, MyValue.class)方法。注意这个方法只会接受成对的类型,一个是键类型,一个是值类型。

Java:

public class Person implements Serializable {
  /** Person ID (indexed). */
  @QuerySqlField(index = true)
  private long id;

  /** Organization ID (indexed). */
  @QuerySqlField(index = true)
  private long orgId;

  /** First name (not-indexed). */
  @QuerySqlField
  private String firstName;

  /** Last name (not indexed). */
  @QuerySqlField
  private String lastName;

  /** Resume text (create LUCENE-based TEXT index for this field). */
  @QueryTextField
  private String resume;

  /** Salary (indexed). */
  @QuerySqlField(index = true)
  private double salary;

  ...
}

3.6.6.通过CacheTypeMetadata进行查询配置

索引和字段也可以通过org.apache.ignite.cache.CacheTypeMetadata进行配置。

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
 <!-- Cache configuration. -->
 <property name="cacheConfiguration">
  <list>
   <bean class="org.apache.ignite.configuration.CacheConfiguration">
    <property name="name" value="my_cache"/>
...
     <!-- Cache types metadata. -->
     <list>
      <bean class="org.apache.ignite.cache.CacheTypeMetadata">
        <!-- Type to query. -->
        <property name="valueType" value="org.apache.ignite.examples.datagrid.store.Person"/>
        <!-- Fields to be queried. -->
        <property name="queryFields">
        <map>
         <entry key="id" value="java.util.UUID"/>
         <entry key="orgId" value="java.util.UUID"/>
         <entry key="firstName" value="java.lang.String"/>
         <entry key="lastName" value="java.lang.String"/>
         <entry key="resume" value="java.lang.String"/>
         <entry key="salary" value="double"/>
        </map>
       </property>
        <!-- Fields to index in ascending order. -->
       <property name="ascendingFields">
        <map>
         <entry key="id" value="java.util.UUID"/>
         <entry key="orgId" value="java.util.UUID"/>
         <entry key="salary" value="double"/>
        </map>
       </property>
       <-- Fields to index as text. -->
       <property name="textFields">
        <list>
         <value>resume</value>
        </list>
      </bean>
     </list>
...
  </list>
 </property>
...
</bean>

Java:

CacheConfiguration ccfg = new CacheConfiguration();
....
Collection<CacheTypeMetadata> types = new ArrayList<>();

CacheTypeMetadata type = new CacheTypeMetadata();
type.setValueType(Person.class.getName());

Map<String, Class<?>> qryFlds = type.getQueryFields();
qryFlds.put("id", UUID.class);
qryFlds.put("orgId", UUID.class);
qryFlds.put("firstName", String.class);
qryFlds.put("lastName", String.class);
qryFlds.put("resume", String.class);
qryFlds.put("salary", double.class);

Map<String, Class<?>> ascFlds = type.getAscendingFields();
ascFlds.put("id", UUID.class);
ascFlds.put("orgId", UUID.class);
ascFlds.put("salary", double.class);

Collection<String> txtFlds = type.getTextFields();
txtFlds.add("resume");

types.add(type);
...
ccfg.setTypeMetadata(types);
...

注解和CacheTypeMetadata是互斥的。要看完整的例子,可以参照:CacheQueryTypeMetadataExample

3.7.SQL查询

Ignite对SQL查询的支持几乎没有任何限制,语法兼容于ANSI-99,可以使用任何SQL函数,聚合以及分组,Ignite会找出从哪里获得查询结果。 可以看下面的SQLQuery例子。

3.7.1.SQL关联

Ignite支持分布式的SQL关联,而且,如果数据位于不同的缓存,Ignite也允许跨缓存的关联。 在分区复制缓存之间进行关联也没有什么限制。然而,如果在两个分区缓存间关联,一点要确保要关联的键是并置的。 可以看下面的SQLQuery关联例子。

3.7.2.字段查询

不用查询整个对象,可以只选择特定的字段来减少网络和序列化的开销,为此,Ignite有一个字段查询的概念,当想执行一些聚合查询时这个也很有用。 可以看下面的SqlFieldsQuery例子。

3.7.3.跨缓存查询

可以从多个缓存中查询数据,这时缓存名称就充当了常规SQL的模式名称。这意味着所有的缓存都可以用带引号的缓存名称来称呼。查询创建的缓存充当了默认模式而且不需要显式地指定。 可以看下面的跨缓存SqlFieldsQuery例子。 SQLQuery:

IgniteCache<Long, Person> cache = ignite.cache("mycache");

SqlQuery sql = new SqlQuery(Person.class, "salary > ?");

// Find only persons earning more than 1,000.
try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs(1000))) {
  for (Entry<Long, Person> e : cursor)
    System.out.println(e.getValue().toString());
}

SQLQuery关联:

IgniteCache<Long, Person> cache = ignite.cache("mycache");

// SQL join on Person and Organization.
SqlQuery sql = new SqlQuery(Person.class,
  "from Person, Organization "
  + "where Person.orgId = Organization.id "
  + "and lower(Organization.name) = lower(?)");

// Find all persons working for Ignite organization.
try (QueryCursor<Entry<Long, Person>> cursor = cache.query(sql.setArgs("Ignite"))) {
  for (Entry<Long, Person> e : cursor)
    System.out.println(e.getValue().toString());
}

SqlFieldsQuery:

IgniteCache<Long, Person> cache = ignite.cache("mycache");

// Select with join between Person and Organization.
SqlFieldsQuery sql = new SqlFieldsQuery(
  "select concat(firstName, ' ', lastName), Organization.name "
  + "from Person, Organization where "
  + "Person.orgId = Organization.id and "
  + "Person.salary > ?");

// Only find persons with salary > 1000.
try (QueryCursor<List<?>> cursor = cache.query(sql.setArgs(1000))) {
  for (List<?> row : cursor)
    System.out.println("personName=" + row.get(0) + ", orgName=" + row.get(1));
}

跨缓存SqlFieldsQuery:

// In this example, suppose Person objects are stored in a 
// cache named 'personCache' and Organization objects 
// are stored in a cache named 'orgCache'.

IgniteCache<Long, Person> personCache = ignite.cache("personCache");

// Select with join between Person and Organization to 
// get the names of all the employees of a specific organization.
SqlFieldsQuery sql = new SqlFieldsQuery(
    "select Person.name  "
        + "from Person, \"orgCache\".Organization where "
        + "Person.orgId = Organization.id "
        + "and Organization.name = ?");

// Execute the query and obtain the query result cursor.
try (QueryCursor<List<?>> cursor =  personCache.query(sql.setArgs("Ignite"))) {
    for (List<?> row : cursor)
        System.out.println("Person name=" + row);
}

3.7.4.通过注解配置SQL索引

索引可以在代码中通过使用@QuerySqlField注解进行配置。要告诉Ignite哪个类型应该被索引,要像下面的例子那样给CacheConfiguration.setIndexedTypes方法传入键值对。注意这个方法只接受类型对,一个是键类型一个是值类型,基本类型要以包装类型的形式传入。 Java:

CacheConfiguration<Object,Object> ccfg = new CacheConfiguration<>();

// Here we are setting 3 key-value type pairs to be indexed.
ccfg.setIndexedTypes(
  MyKey.class, MyValue.class,
  Long.class, MyOtherValue.class,
  UUID.class, String.class
);

使字段对于SQL查询可见 要使字段对于SQL查询可见,需要给字段加上@QuerySqlField注解。下面的例子中,字段age对于SQL不可见,要注意的是这些字段都没有索引。 Java:

public class Person implements Serializable {
  /** Will be visible in SQL. */
  @QuerySqlField
  private long id;

  /** Will be visible in SQL. */
  @QuerySqlField
  private String name;

  /** Will NOT be visible in SQL. */
  private int age;
}

Scala注解 在Scala类中,@QuerySqlField注解必须和@Field注解一起使用,这样的话这个字段对于Ignite才是可见的,就像这样的:@(QuerySqlField @field).

单列索引 要使字段对于SQL不仅可见还希望加快查询的速度,需要索引该字段的值,要创建一个单列索引,只需要给字段加上@QuerySqlField(index = true)注解即可。 Java:

public class Person implements Serializable {
  /** Will be indexed in ascending order. */
  @QuerySqlField(index = true)
  private long id;

  /** Will be visible in SQL, but not indexed. */
  @QuerySqlField
  private String name;

  /** Will be indexed in descending order. */
  @QuerySqlField(index = true, descending = true)
  private int age;
}

Scala注解 在Scala类中,@QuerySqlField注解必须和@Field注解一起使用,这样的话这个字段对于Ignite才是可见的,就像这样的:@(QuerySqlField @field).

分组索引 当查询条件复杂时可以使用多字段索引来加快查询的速度,这时可以用@QuerySqlField.Group注解。如果希望一个字段参与多个分组索引时也可以将多个@QuerySqlField.Group注解加入orderedGroups中。 在下面的分组索引的例子中,age字段加入了名为age_salary_idx的分组索引,他的分组序号是0并且降序排列,同一个分组索引中还有一个字段salary,他的分组序号是3并且升序排列。最重要的是salary字段还是一个单列索引(除了orderedGroups声明之外,还加上了index = true)。声明中的order不需要是什么特别的数值,他只是用于分组内的字段排序。 Java:

public class Person implements Serializable {
  /** Indexed in a group index with "salary". */
  @QuerySqlField(orderedGroups={@QuerySqlField.Group(
    name = "age_salary_idx", order = 0, descending = true)})
  private int age;

  /** Indexed separately and in a group index with "age". */
  @QuerySqlField(index = true, orderedGroups={@QuerySqlField.Group(
    name = "age_salary_idx", order = 3)})
  private double salary;
}

注意,将@QuerySqlField.Group放在@QuerySqlField(orderedGroups={...})外面是无效的。

3.7.5.通过CacheTypeMetadata配置SQL索引

索引和字段也可以通过org.apache.ignite.cache.CacheTypeMetadata进行配置,她便于利用Spring进行XML配置。细节可以参照javadoc,基本上来说他和@QuerySqlField是等价的。 XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
 <!-- Cache configuration. -->
 <property name="cacheConfiguration">
  <list>
   <bean class="org.apache.ignite.configuration.CacheConfiguration">
    <property name="name" value="my_cache"/>
    <property name ="typeMetadata">
     <!-- Cache types metadata. -->
     <list>
      <bean class="org.apache.ignite.cache.CacheTypeMetadata">
        <!-- Type to query. -->
        <property name="valueType" value="org.apache.ignite.examples.datagrid.store.Person"/>
        <!-- Fields to be queried. -->
        <property name="queryFields">
        <map>
         <entry key="id" value="java.lang.Long"/>
         <entry key="orgId" value="java.util.UUID"/>
         <entry key="firstName" value="java.lang.String"/>
         <entry key="lastName" value="java.lang.String"/>
         <entry key="resume" value="java.lang.String"/>
         <entry key="salary" value="double"/>
        </map>
       </property>
        <!-- Fields to index in ascending order. -->
       <property name="ascendingFields">
        <map>
         <entry key="id" value="java.util.UUID"/>
         <entry key="orgId" value="java.util.UUID"/>
         <entry key="salary" value="double"/>
        </map>
       </property>
      </bean>
     </list>
...
  </list>
 </property>
...
</bean>

3.7.6.SQL查询如何工作

Ignite中处理查询主要有两种方式:

  1. 如果在复制缓存中执行查询,那么Ignite会假定所有数据都是本地有效的然后只是在H2数据库引擎中执行一个简单的查询,本地缓存中也是一样的。
  2. 如果在分区缓存中执行查询,他工作方式如下:查询会被解析并且拆分成多个Map查询和一个Reduce查询,然后所有的Map查询在分区缓存的所有数据节点上执行,再把结果提供给Reduce节点,他会在这些中间结果上依次执行Reduce查询。

3.7.7.使用执行计划

Ignite在SQL中支持EXPLAIN ...语法,Ignite中读取执行计划是分析查询性能的一个主要方式,注意计划游标会包含很多行:最后一个是Reduce节点查询的,其他的是Map节点的。 Java:

SqlFieldsQuery sql = new SqlFieldsQuery(
  "explain select name from Person where age = ?").setArgs(26); 

System.out.println(cache.query(sql).getAll());

执行计划本身是通过H2生成的,详细请参照:http://www.h2database.com/html/performance.html#explain_plan

3.7.8.使用H2调试控制台

当用Ignite进行开发时,有时对于检查表和视图是否正确或者运行在嵌入节点内部的H2数据库中的本地查询是非常有用的,为此Ignite提供了启动H2控制台的功能。要启用该功能,在启动节点时要将IGNITE_H2_DEBUG_CONSOLE系统属性或者环境变量设置为true。然后就可以在浏览器中打开控制台,可能需要点击控制台中的刷新按钮,因为有可能控制台在数据库对象初始化之前打开。

3.7.9.堆外SQL索引

Ignite支持将索引数据放在堆外内存,这个设计对于避免在堆上保存特别大的数据集导致频繁的垃圾回收以及不可预知的响应时间是很有用的。 当CacheConfiguration.setOffHeapMaxMemory()设置为-1时SQL索引会保存在堆上,否则会一直使用堆外索引。要注意这是打开和关闭堆外索引的唯一属性,例如CacheConfiguration.setMemoryMode()方法对于索引不起作用。 当打开堆外功能时要提高SQL查询的性能,可以试着增加CacheConfiguration.setSqlOnheapRowCacheSize()属性的值,他的默认值是10000. Java:

CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();

// Set unlimited off-heap memory for cache and enable off-heap indexes.
ccfg.setOffHeapMaxMemory(0); 

// Cache entries will be placed on heap and can be evited to off-heap.
ccfg.setMemoryMode(ONHEAP_TIERED);
ccfg.setEvictionPolicy(new RandomEvictionPolicy(100_000));

// Increase size of SQL on-heap row cache for off-heap indexes.
ccfg.setSqlOnheapRowCacheSize(100_000);

3.7.10.选择索引

在Ignite应用中使用索引时,有很多事情需要考虑:

  • 索引不是没有代价的。它消耗内存而且还需要单独进行更新,因此当使用较多的索引时缓存的更新性能也会下降,这个时候执行查询时如果选择了错误的索引优化器会犯更多的错误。

索引每个字段是错误的!

  • 索引只是有序数据结构,如果在字段(a,b,c)上定义了索引,那么记录的排序会是首先是a,然后b,再然后c。
  • 单字段索引不如以同一个字段开始的多字段分组索引(索引(a)不如索引(a,b,c)),因此要优先使用分组索引。

3.7.11.性能和易用性考量

当执行SQL查询时有一些常见的陷阱需要注意:

  1. 如果查询使用了操作符OR那么他可能不是以期望的方式使用索引。比如对于查询:select name from Person where sex='M' and (age = 20 or age = 30),age上的索引并不会生效,虽然它明显比sex上的索引选择性更强。要解决这个问题需要用UNION ALL重写这个查询(注意没有ALL的UNION会返回去重的行,这会改变查询的语意而且引入了额外的性能开销),就是这样的:select name from Person where sex='M' and age = 20 UNION ALL select name from Person where sex='M' and age = 30,这个方式索引就被正确使用了。
  2. 如果查询使用了操作符IN那么会有两个问题:无法提供可变参数列表(需要在查询中指定明确的列表,比如where id in (?, ?, ?),但是不能写where id in ?然后传入一个数组或者集合)并且查询无法使用索引,要解决这两个问题需要像这样重写查询:select p.name from Person p join table(id bigint = ?) i on p.id = i.id,这里可以提供一个任意长度的对象数组(Object[])作为参数,然后会在字段id上使用索引。注意基本类型数组(比如int[],long[]等)无法使用这个语法,但是可以使用基本类型的包装器。

3.8.持续查询

持续查询对于当执行一个查询之后希望持续地获得该查询结果数据更新的通知时,是很有用的。 持续查询是通过ContinuousQuery类实现的,他支持如下特性:

初始化查询

当执行持续查询时,有一个选项在开始监听更新之前执行初始化查询。初始化查询可以通过ContinuousQuery.setInitialQuery(Query)方法进行设置,并且可以是任意查询类型,包括扫描查询,SQL查询和文本查询。这个参数是可选的,并且如果不设置,这个功能是不会启用的。

远程过滤器

过滤器执行在给定键对应的主节点上,然后评估事件是否会被传播给监听器。如果监听器返回true,那么监听器就会收到通知,否则事件会被忽略。发生监听事件的节点会最大限度地减少监听通知的网络流量。远程过滤器可以通过ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>)方法进行设置。

本地监听器

每当事件通过远程过滤器,他们会被发送给客户端来通知本地的监听器,本地监听器是通过ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>)方法设置的。 Java8:

IgniteCache<Integer, String> cache = ignite.cache("mycache");

// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

// Optional initial query to select all keys greater than 10.
qry.setInitialQuery(new ScanQuery<Integer, String>((k, v) -> k > 10)):

// Callback that is called locally when update notifications are received.
qry.setLocalListener((evts) -> 
  evts.stream().forEach(e -> System.out.println("key=" + e.getKey() + ", val=" + e.getValue())));

// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilter(e -> e.getKey() > 10);

// Execute query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
  // Iterate through existing data stored in cache.
  for (Cache.Entry<Integer, String> e : cur)
    System.out.println("key=" + e.getKey() + ", val=" + e.getValue());

  // Add a few more keys and watch a few more query notifications.
  for (int i = 5; i < 15; i++)
    cache.put(i, Integer.toString(i));
}

Java7:

IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);

// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

qry.setInitialQuery(new ScanQuery<Integer, String>(new IgniteBiPredicate<Integer, String>() {
  @Override public boolean apply(Integer key, String val) {
    return key > 10;
  }
}));

// Callback that is called locally when update notifications are received.
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
  @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
    for (CacheEntryEvent<Integer, String> e : evts)
      System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
  }
});

// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() {
  @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
    return e.getKey() > 10;
  }
});

// Execute query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
  // Iterate through existing data.
  for (Cache.Entry<Integer, String> e : cur)
    System.out.println("key=" + e.getKey() + ", val=" + e.getValue());

  // Add a few more keys and watch more query notifications.
  for (int i = keyCnt; i < keyCnt + 10; i++)
    cache.put(i, Integer.toString(i));
}

3.9.事务

Ignite支持两种缓存模式,事务的原子的,在事务模式中可以在一个事务中组合多个缓存操作,而原子模式支持多个原子性操作,一次一个。原子模式更轻量并且通常比事务模式具有更好的性能。 然而,不管用那种模式,只要集群还是活的,不同集群节点之间的数据必须保持一致性。这意味着不管那个节点用于获取数据,他都不会得到部分提交或者与其他数据不一致的数据。

3.9.1.IgniteTransactions

IgniteTransactions接口包括了启动和结束事务的功能,以及订阅监听器或者获得指标数据。

跨缓存事务,可以在一个事务中组合来自不同节点的多个操作。注意他可以在一个事务中更新不同类型的缓存,比如复制分区缓存。

可以像下面这样获得IgniteTransactions的一个实例: Java:

try (Transaction tx = transactions.txStart()) {
    Integer hello = cache.get("Hello");

    if (hello == 1)
        cache.put("Hello", 11);

    cache.put("World", 22);

    tx.commit();
}

3.9.2.2阶段提交(2PC)

Ignite在事务中使用了2阶段提交的协议,但是只要适用也带有很多一阶段提交的优化。在一个事务中当数据更新时,Ignite会在本地事务映射中保持事务状态直到调用了commit()方法,在这一点,只要需要,数据都会被传输到远程节点。 对于Ignite2阶段提交是怎么工作的更多信息,可以参照如下博客:

ACID完整性 Ignite提供了完整的ACID(原子性,一致性,隔离性和持久性)兼容事务来确保一致性。

3.9.3.乐观和悲观

当配置了事务的原子模式,Ignite支持了事务的乐观悲观并发模型。主要的不同点是悲观锁是在访问时获得,而乐观锁是在提交阶段获得。 Ignite还支持如下的隔离级别:

  • 读提交:数据总是从主节点获得,尽管他已经在事务内被访问。
  • 可重复读:数据只是第一次访问时从主节点获得,然后就存储在本地事务映射中,之后所有对同一数据的访问都是本地化的。
  • 序列化:与乐观锁组合时当并发更新时事务会抛出TransactionOptimisticException异常。

Java:

IgniteTransactions txs = ignite.transactions();

// Start transaction in optimistic mode with repeatable read isolation level.
Transaction tx = txs.txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);

3.9.4.集成JTA

Ignite可以用一个JTA事务管理器搜索类TransactionConfiguration#setTxManagerLookupClassName进行配置,事务管理器索索基本上是一个工厂,他给Ignite提供了一个JTA事务管理器的实例。 设置了之后,在事务中的每一次缓存操作,Ignite都会检查是否存在一个进行中的JTA事务。如果JTA事务开启了,Ignite也会开启一个事务然后通过他自己的一个XAResource的内部实现来将其加入JTA事务,Ignite事务会准备,提交或者干脆回滚相应的JTA事务。 下面是一个在Ignite中使用JTA事务的例子: Java:

// Get an instance of JTA transaction manager.
TMService tms = appCtx.getComponent(TMService.class);

// Get an instance of Ignite cache.
IgniteCache<String, Integer> cache = cache();

UserTransaction jtaTx = tms.getUserTransaction();

// Start JTA transaction.
jtaTx.begin();

try {
    // Do some cache operations.
    cache.put("key1", 1);
    cache.put("key2", 2);

    // Commit the transaction.
    jtaTx.commit();
}
finally {
    // Rollback in a case of exception.
    if (jtaTx.getStatus() == Status.STATUS_ACTIVE)
        jtaTx.rollback();
}

3.9.5.原子性模式

Ignite在CacheAtomicityMode枚举中定义了2中原子性模式:

  • TRANSACTIONAL
  • ATOMIC

TRANSACTIONAL模式起用了完整的ACID兼容事务。然而,当只需要原子性的时候,建议使用ATOMIC模式已达到更好的性能。 ATOMIC模式通过避免事务锁来提供了更好的性能,但是仍然提供了数据的原子性和一致性。ATOMIC模式的另一个不同是批量写,比如putAll(...)removeAll(...)方法,并不在一个事务中执行并且允许部分失败。在部分失败时,会抛出CachePartialUpdateException异常,他包含了一个更新失败的键列表。

性能,注意使用ATOMIC模式时事务是禁用的,当不需要事务时他会获得更高的性能和吞吐量。

3.9.6.配置

原子模式是在CacheAtomicityMode枚举中定义的,可以通过CacheConfigurationatomicityMode属性进行配置。 原子性模式默认为ATOMIC

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <!-- Set a cache name. -->
            <property name="name" value="myCache"/>

            <!-- Set atomicity mode, can be ATOMIC or TRANSACTIONAL. -->
            <property name="atomicityMode" value="TRANSACTIONAL"/>
            ... 
        </bean>
    </property>

    <!-- Optional transaction configuration. -->
    <property name="transactionConfiguration">
        <bean class="org.apache.ignite.configuration.TransactionConfiguration">
            <!-- Configure TM lookup here. -->
        </bean>
    </property>
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

cacheCfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Optional transaction configuration. Configure TM lookup here.
TransactionConfiguration txCfg = new TransactionConfiguration();

cfg.setTransactionConfiguration(txCfg);

// Start Ignite node.
Ignition.start(cfg);

3.10.堆外内存

当堆上的数据集特别大时,堆外内存通过将缓存数据保存在Java主堆空间之外来避免因为JVM长时间的垃圾回收导致的暂停,但是数据仍然在内存中。

堆外索引 注意当配置了堆外存储时,Ignite也会把查询索引保存在堆外,这意味着索引也不会占用任何的堆内空间。

堆外内存与多进程 也可以通过在同一台物理机上以小型堆开启多个进程来管理垃圾回收导致的停顿,然而,当使用复制缓存时这个做法是浪费的。因为对于开启的每个JVM进程,最终都会缓存完全一致的复制数据。

3.10.1.分层堆外存储

Ignite提供了分层存储模型,使数据可以在堆内堆外交换空间之间保存和移动。越往上的层提供了更大的存储能力,逐渐地延迟也会增加。 Ignite提供了三种存储模型,是在CacheMemoryMode中定义的。 |存储模型|描述| |---|---| |ONHEAP_TIERED|保存在堆内,退出到堆外以及可选的存储在交换空间| |OFFHEAP_TIERED|保存在堆外,避开堆内以及可选的退出到交换空间| |OFFHEAP_VALUES|将键存储在堆内,将值存储在堆外|

3.10.2.堆内层

Ignite中,ONHEAP_TIERED是默认的存储模型,所有的缓存数据都会存储在堆内,数据可以从堆内移动到堆外存储,如果配置了还可以移动到交换空间。 要配置ONHEAP_TIERED存储模型,需要:

  1. CacheConfigurationmemoryMode属性设置为ONHEAP_TIERED.
  2. 启动堆外内存(可选)
  3. 配置堆内内存的退出策略

XML:

<bean class="org.apache.ignite.configuration.CacheConfiguration">
  ...
  <!-- Store cache entries on-heap. -->
  <property name="memoryMode" value="ONHEAP_TIERED"/> 

  <!-- Enable Off-Heap memory with max size of 10 Gigabytes (0 for unlimited). -->
  <property name="offHeapMaxMemory" value="#{10 * 1024L * 1024L * 1024L}"/>

  <!-- Configure eviction policy. -->
  <property name="evictionPolicy">
    <bean class="org.apache.ignite.cache.eviction.fifo.CacheFifoEvictionPolicy">
      <!-- Evict to off-heap after cache size reaches maxSize. -->
      <property name="maxSize" value="100000"/>
    </bean>
  </property>
  ...
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED);

// Set off-heap memory to 10GB (0 for unlimited)
cacheCfg.setOffHeapMaxMemory(10 * 1024L * 1024L * 1024L);

CacheFifoEvictionPolicy evctPolicy = new CacheFifoEvictionPolicy();

// Store only 100,000 entries on-heap.
evctPolicy.setMaxSize(100000);

cacheCfg.setEvictionPolicy(evctPolicy);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

退出策略 注意如果在ONHEAP_TIERED模式中没有启用退出策略,数据是不会从堆内移动到堆外的。

3.10.3.堆外层

这个存储模型可以配置将缓存数据直接存储在堆外内存空间,避开堆内内存。如果所有数据都存储在堆外,那么就没有必要显式地配置退出策略。如果存储超过了限制(0为没有限制),那么LRU退出策略就会被用于将数据从堆外存储可选地移动到交换空间,如果配置了的话。 要配置OFFHEAP_TIERED存储模型,需要:

  1. CacheConfigurationmemoryMode属性设置为OFFHEAP_TIERED.
  2. 启用堆外存储(可选)

XML:

<bean class="org.apache.ignite.configuration.CacheConfiguration">
  ...
  <!-- Always store cache entries in off-heap memory. -->
  <property name="memoryMode" value="OFFHEAP_TIERED"/>

  <!-- Enable Off-Heap memory with max size of 10 Gigabytes (0 for unlimited). -->
  <property name="offHeapMaxMemory" value="#{10 * 1024L * 1024L * 1024L}"/>
  ...
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);

// Set off-heap memory to 10GB (0 for unlimited)
cacheCfg.setOffHeapMaxMemory(10 * 1024L * 1024L * 1024L);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.10.4.堆外值

这个存储模型可以将键存储在堆内,将值存储在堆外,这个存储模型适用于键较小和值较大的场景。 要配置OFFHEAP_VALUES存储模型,需要:

  1. CacheConfigurationmemoryMode属性设置为OFFHEAP_VALUES.
  2. 启用堆外存储
  3. 配置堆外存储的退出策略(可选)

XML:

<bean class="org.apache.ignite.configuration.CacheConfiguration">
  ...
  <!-- Always store cache entries in off-heap memory. -->
  <property name="memoryMode" value="OFFHEAP_VALUES"/>

  <!-- Enable Off-Heap memory with max size of 10 Gigabytes (0 for unlimited). -->
  <property name="offHeapMaxMemory" value="#{10 * 1024L * 1024L * 1024L}"/>
  ...
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setMemoryMode(CacheMemoryMode.OFFHEAP_VALUES);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.10.5.交换空间

当数据集超过了堆内和堆外存储的限值,需要配置交换空间以使Ignite可以将数据退出到磁盘而不是抛弃他们。

交换空间性能 因为交换空间是磁盘上的,所以性能会显著慢于堆内和堆外内存。

XML:

<bean class="org.apache.ignite.configuration.CacheConfiguration">
  ...
  <!-- Enable swap. -->
  <property name="swapEnabled" value="true"/> 
  ...
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setSwapEnabled(true);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.11.关系并置

鉴于分区缓存是最常见的缓存数据的方式,那么数据和计算以及数据和数据的并置就可以显著地提升性能和应用的可扩展性。

3.11.1.数据与数据的并置

在许多情况下,如果不同的缓存键被同时访问的话那么将他们并置在一起是很有利的。通常来说业务逻辑需要访问不止一个的缓存键,通过将他们并置在一起可以确保所有的键具有缓存在同一个处理节点上的同一个affinityKey,从而避免从远程节点获取数据的昂贵网络开销。 例如,有一个PersonCompany对象,然后希望将Person对象和其工作的Company对象并置在一起,为达到这个目的,缓存Person对象的实体需要有一个属性或者方法加注@AffinityKeyMapped注解,他提供了要并置的Company对象的键值。为了方便,也可以可选地使用AffinityKey类。 使用PersonKey:

public class PersonKey {
    // Person ID used to identify a person.
    private String personId;

    // Company ID which will be used for affinity.
    @AffinityKeyMapped
    private String companyId;
    ...
}

// Instantiate person keys with the same company ID which is used as affinity key.
Object personKey1 = new PersonKey("myPersonId1", "myCompanyId");
Object personKey2 = new PersonKey("myPersonId2", "myCompanyId");

Person p1 = new Person(personKey1, ...);
Person p2 = new Person(personKey2, ...);

// Both, the company and the person objects will be cached on the same node.
cache.put("myCompanyId", new Company(...));
cache.put(personKey1, p1);
cache.put(personKey2, p2);

使用AffinityKey:

Object personKey1 = new AffinityKey("myPersonId1", "myCompanyId");
Object personKey2 = new AffinityKey("myPersonId2", "myCompanyId");

Person p1 = new Person(personKey1, ...);
Person p2 = new Person(personKey2, ...);

// Both, the company and the person objects will be cached on the same node.
cache.put("myCompanyId", new Company(..));
cache.put(personKey1, p1);
cache.put(personKey2, p2);

SQL关联 当在分区缓存上的数据执行SQL分布式关联时,一定要确保关联的键是并置的。

3.11.2.数据和计算的并置

也有可能向缓存数据的节点发送计算,这是一个被称为数据和计算的并置的概念,他可以向特定的节点发送整个的工作单元。 要将数据和计算并置在一起,需要使用IgniteCompute.affinityRun(...)IgniteCompute.affinityCall(...)方法。 下面的例子显示了如何和上面提到的缓存PersonCompany对象的同一个集群节点进行并置计算:

Java8:

String companyId = "myCompanyId";

// Execute Runnable on the node where the key is cached.
ignite.compute().affinityRun("myCache", companyId, () -> {
  Company company = cache.get(companyId);

  // Since we collocated persons with the company in the above example,
  // access to the persons objects is local.
  Person person1 = cache.get(personKey1);
  Person person2 = cache.get(personKey2);
  ...  
});

Java7:

inal String companyId = "myCompanyId";

// Execute Runnable on the node where the key is cached.
ignite.compute().affinityRun("myCache", companyId, new IgniteRunnable() {
  @Override public void run() {
    Company company = cache.get(companyId);

    Person person1 = cache.get(personKey1);
    Person person2 = cache.get(personKey2);
    ...
  }
};

3.11.3.IgniteCompute和EntryProcessor

IgniteCompute.affinityRun(...)IgniteCache.invoke(...)方法都提供了数据和计算的并置。主要的不同在于invoke(...)方法是原子的并且执行时在键上加了锁,无法从EntryProcessor逻辑内部访问其他的键,因为它会触发一个死锁。 另一方面,又不持有任何锁,比如,在这些方法内开启多个事务或者执行缓存查询是绝对合法的,不用担心死锁。这时Ignite会自动检测处理是并置的然后对事务采用优化过的一阶段提交而不是二阶段提交。

关于IgniteCache.invoke(...)方法的更多信息,请参照3.2.超越JCache文档。

3.12.持久化存储

JCache附带了javax.cache.integration.CacheLoaderjavax.cache.integration.CacheWriterAPI,他们分别用于底层持久化存储的通读通写(比如RDBMS中的Oracle或者MySQL,以及NoSQL数据库中的MongoDB或者CouchDB)。 虽然Ignite可以单独地配置CacheRLoaderCacheWriter,但是在两个单独的类中实现事务化存储是非常尴尬的,因为多个loadput操作需要在同一个事务中的同一个连接中共享状态。为了缓解这个问题,Ignite提供了·org.apacche.ignite.cache.store.CacheStore·接口,他同时扩展了CacheLoaderCacheWriter

事务 CacheStore是完整事务性的,他会自动地融入当前的缓存事务。 CacheJdbcPojoStore Ignite附带了他自己的CacheJdbcPojoStore,他会自动地建立Java POJO和数据库模式之间的映射,可以参照3.13.自动持久化章节

3.12.1.通读和通写

当希望通读和通写时,提供一个正常的缓存存储的实现是很重要的。通读意味着当缓存无效时会从持久化存储中读取,通写意味着当缓存更新时会自动地进行持久化。所有的通读和通写都会参与整体的缓存事务以及作为一个整体提交或者回滚。 要配置通读和通写,需要实现CacheStore接口以及设置CacheConfigurationcacheStoreFactoryreadThroughwriteThrough属性,下面的例子会有说明。

3.12.2.后写缓存

在一个简单的通写模式中每个缓存的put和remove操作都会涉及一个持久化存储的请求,因此整个缓存更新的持续时间可能是相对比较长的。另外,密集的缓存更新频率也会导致非常高的存储负载。 对于这种情况,Ignite提供了一个选项来执行异步化的持久化存储更新,也叫做后写,这个方法的主要概念是累加更新操作然后作为一个批量操作异步化地刷新入持久化存储中。真实的数据持久化可以被基于时间的事件触发(数据输入的最大时间受到队列的限制),也可以被队列的大小触发(当队列大小达到一个限值后队列就被刷新了),或者通过两者的组合触发,这时任何事件都会触发刷新。

更新顺序 对于后写的方式只有数据的最后一次更新会被写入底层存储。如果键为key1的缓存数据分别依次地更新为值value1、value2和value3,那么只有(key1,value3)对这一个存储请求会被传播到持久化存储。 更新性能 批量的存储操作通常比按顺序的单一存储操作更有效率,因此可以通过开启后写模式的批量操作来利用这个特性。简单类型(put和remove)的简单顺序更新操作可以被组合成一个批量操作。比如,连续地往缓存中加入(key1,value1),(key2,value2),(key3,value3)可以通过一个单一的CacheStore.putAll(...)操作批量处理。

后写缓存可以通过CacheConfiguration.setWriteBehindEnabled(boolean)配置项来开启,下面的3.12.6.配置章节显示了一个完整的配置属性列表来进行后写缓存行为的定制。

3.12.3.CacheStore

Ignite中的CacheStore接口用于向底层的数据存储写入或者加载数据。除了标准的JCache加载和存储方法,他还引入了最终事务划界以及从底层数据存储批量载入数据的能力。

loadCache()

CacheStore.loadCache()方法可以加载缓存,即使没有传入要加载的所有键,它通常用于启动时缓存的热加载,但是也可以在缓存加载完之后的任何时间点调用。 在每一个相关的集群节点,IgniteCache.loadCache()方法会分配给CacheStore.loadCache()方法,如果只想在本地节点上进行加载,可以用IgniteCache.localLoadCache()方法。

对于分区缓存,不管是主节点还是备份节点,如果键没有被映射到该节点,会被缓存自动丢弃。

load(), write(), delete()

IgniteCache接口的get,put,remove方法被调用时,相对应的CacheStoreload(),write()delete()方法会被调用,当与单个缓存数据工作时,这些方法会用于启用通读通写行为。

loadAll(), writeAll(), deleteAll()

IgniteCache接口的getAll,putAll,removeAll方法被调用时,相对应的CacheStoreloadAll(),writeAll()deleteAll()方法会被调用,当与多个缓存数据工作时,这些方法会用于启用通读通写行为,他们通常用批量操作的方式实现以提供更好的性能。

CacheStoreAdapter提供了loadAll(),writeAll()deleteAll()方法的默认实现,他只是简单地对键进行一个一个地迭代。

sessionEnd()

Ignite有一个存储会话的概念,他可以跨越不止一个的缓存存储操作,会话对于事务非常有用。 对于原子化的缓存,sessionEnd()方法会在每个CacheStore方法完成之后被调用,对于事务化的缓存,不管是在底层持久化存储进行提交或者回滚多个操作,sessionEnd()方法都会在每个事务结束后被调用。

CacheStoreAdapater提供了sessionEnd()方法的默认的空实现。

3.12.4.CacheStoreSession

缓存存储会话的主要目的是当CacheStore用于事务中时在多个存储操作中持有一个上下文。比如,如果使用JDBC,可以通过CacheStoreSession.attach()方法保存数据库的连接,然后可以在CacheStore.sessionEnd(boolean)方法中提交这个连接。 CacheStoreSession可以通过@GridCacheStoreSessionResource注解注入自定义的缓存存储实现中。

3.12.5.CacheStore示例

下面是几个不同场景的缓存存储的实现,注意事务化的实现用还是没用事务。 JDBC非事务:

public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  // This mehtod is called whenever "get(...)" methods are called on IgniteCache.
  @Override public Person load(Long key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
        st.setLong(1, key);

        ResultSet rs = st.executeQuery();

        return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load: " + key, e);
    }
  }

  // This mehtod is called whenever "put(...)" methods are called on IgniteCache.
  @Override public void write(Cache.Entry<Long, Person> entry) {
    try (Connection conn = connection()) {
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();

          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());

          st.executeUpdate();
        }
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
    }
  }

  // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  @Override public void delete(Object key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        st.setLong(1, (Long)key);

        st.executeUpdate();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to delete: " + key, e);
    }
  }

  // This mehtod is called whenever "loadCache()" and "localLoadCache()"
  // methods are called on IgniteCache. It is used for bulk-loading the cache.
  // If you don't need to bulk-load the cache, skip this method.
  @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
    if (args == null || args.length == 0 || args[0] == null)
      throw new CacheLoaderException("Expected entry count parameter is not provided.");

    final int entryCnt = (Integer)args[0];

    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
        try (ResultSet rs = st.executeQuery()) {
          int cnt = 0;

          while (cnt < entryCnt && rs.next()) {
            Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

            clo.apply(person.getId(), person);

            cnt++;
          }
        }
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load values from cache store.", e);
    }
  }

  // Open JDBC connection.
  private Connection connection() throws SQLException  {
    // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
    // In this example we use H2 Database for simplification.
    Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");

    conn.setAutoCommit(true);

    return conn;
  }
}

JDBC事务:

public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  /** Auto-injected store session. */
  @CacheStoreSessionResource
  private CacheStoreSession ses;

  // Complete transaction or simply close connection if there is no transaction.
  @Override public void sessionEnd(boolean commit) {
    try (Connection conn = ses.getAttached()) {
      if (conn != null && ses.isWithinTransaction()) {
        if (commit)
          conn.commit();
        else
          conn.rollback();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to end store session.", e);
    }
  }

  // This mehtod is called whenever "get(...)" methods are called on IgniteCache.
  @Override public Person load(Long key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where id=?")) {
        st.setLong(1, key);

        ResultSet rs = st.executeQuery();

        return rs.next() ? new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) : null;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load: " + key, e);
    }
  }

  // This mehtod is called whenever "put(...)" methods are called on IgniteCache.
  @Override public void write(Cache.Entry<Long, Person> entry) {
    try (Connection conn = connection()) {
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();

          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());

          st.executeUpdate();
        }
      }
    }        
    catch (SQLException e) {
      throw new CacheWriterException("Failed to write [key=" + key + ", val=" + val + ']', e);
    }
  }

  // This mehtod is called whenever "remove(...)" methods are called on IgniteCache.
  @Override public void delete(Object key) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        st.setLong(1, (Long)key);

        st.executeUpdate();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to delete: " + key, e);
    }
  }

  // This mehtod is called whenever "loadCache()" and "localLoadCache()"
  // methods are called on IgniteCache. It is used for bulk-loading the cache.
  // If you don't need to bulk-load the cache, skip this method.
  @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
    if (args == null || args.length == 0 || args[0] == null)
      throw new CacheLoaderException("Expected entry count parameter is not provided.");

    final int entryCnt = (Integer)args[0];

    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
        try (ResultSet rs = st.executeQuery()) {
          int cnt = 0;

          while (cnt < entryCnt && rs.next()) {
            Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

            clo.apply(person.getId(), person);

            cnt++;
          }
        }
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load values from cache store.", e);
    }
  }

  // Opens JDBC connection and attaches it to the ongoing
  // session if within a transaction.
  private Connection connection() throws SQLException  {
    if (ses.isWithinTransaction()) {
      Connection conn = ses.getAttached();

      if (conn == null) {
        conn = openConnection(false);

        // Store connection in the session, so it can be accessed
        // for other operations within the same transaction.
        ses.attach(conn);
      }

      return conn;
    }
    // Transaction can be null in case of simple load or put operation.
    else
      return openConnection(true);
  }

  // Opens JDBC connection.
  private Connection openConnection(boolean autocommit) throws SQLException {
    // Open connection to your RDBMS systems (Oracle, MySQL, Postgres, DB2, Microsoft SQL, etc.)
    // In this example we use H2 Database for simplification.
    Connection conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1");

    conn.setAutoCommit(autocommit);

    return conn;
  }
}

JDBC批量操作

public class CacheJdbcPersonStore extends CacheStore<Long, Person> {
  // Skip single operations and open connection methods.
  // You can copy them from jdbc non-transactional or jdbc transactional examples.
  ...

  // This mehtod is called whenever "getAll(...)" methods are called on IgniteCache.
  @Override public Map<K, V> loadAll(Iterable<Long> keys) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement(
        "select firstName, lastName from PERSONS where id=?")) {
        Map<K, V> loaded = new HashMap<>();

        for (Long key : keys) {
          st.setLong(1, key);

          try(ResultSet rs = st.executeQuery()) {
            if (rs.next())
              loaded.put(key, new Person(key, rs.getString(1), rs.getString(2));
          }
        }

        return loaded;
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to loadAll: " + keys, e);
    }
  }

  // This mehtod is called whenever "putAll(...)" methods are called on IgniteCache.
  @Override public void writeAll(Collection<Cache.Entry<Long, Person>> entries) {
    try (Connection conn = connection()) {
      // Syntax of MERGE statement is database specific and should be adopted for your database.
      // If your database does not support MERGE statement then use sequentially update, insert statements.
      try (PreparedStatement st = conn.prepareStatement(
        "merge into PERSONS (id, firstName, lastName) key (id) VALUES (?, ?, ?)")) {
        for (Cache.Entry<Long, Person> entry : entries) {
          Person val = entry.getValue();

          st.setLong(1, entry.getKey());
          st.setString(2, val.getFirstName());
          st.setString(3, val.getLastName());

          st.addBatch();
        }

        st.executeBatch();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to writeAll: " + entries, e);
    }
  }

  // This mehtod is called whenever "removeAll(...)" methods are called on IgniteCache.
  @Override public void deleteAll(Collection<Long> keys) {
    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("delete from PERSONS where id=?")) {
        for (Long key : keys) {
          st.setLong(1, key);

          st.addBatch();
        }

        st.executeBatch();
      }
    }
    catch (SQLException e) {
      throw new CacheWriterException("Failed to deleteAll: " + keys, e);
    }
  }
}

3.12.6.配置

下面的配置参数可以通过CacheConfiguration用于启用以及配置通写缓存:

setter方法 描述 默认值
setWriteBehindEnabled(boolean) 设置后写是否启用的标志 false
setWriteBehindFlushSize(int) 后写缓存的最大值,如果超过了这个限值,所有的缓存数据都会被刷入缓存存储然后写缓存被清空。如果值为0,刷新操作将会依据刷新频率间隔,注意不能将写缓存大小和刷新频率都设置为0 10240
setWriteBehindFlushFrequency(long) 后写缓存的刷新频率,单位为毫秒,该值定义了从对缓存对象进行插入/删除和当相应的操作被施加到缓存存储的时刻之间的最大时间间隔。如果值为0,刷新会依据写缓存大小,注意不能将写缓存大小和刷新频率都设置为0 5000
setWriteBehindFlushThreadCount(int) 执行缓存刷新的线程数 1
setWriteBehindBatchSize(int) 后写缓存存储操作的操作数最大值 512

CacheStore接口可以在IgniteConfiguration上通过一个工厂进行设置,就和CacheLoaderCacheWriter同样的方式。

对于分布式缓存配置的factory应该是序列化的。

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          ...
          <property name="cacheStoreFactory">
            <bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory">
              <constructor-arg>
                <bean class="foo.bar.MyPersonStore">
                  ...
                </bean>
              </constructor-arg>
            </bean>
          </property>
          ...
        </bean>
      </list>
    </property>
  ...
</bean>

Java:

IgniteConfiguration cfg = new IgniteConfiguration();

CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<>();

CacheStore<Long, Person> store;

store = new MyPersonStore();

cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<>(store));
cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.12.7.CacheJdbcBlobStore

CacheJdbcBlobStore实现基于JDBC,这个实现将对象以BLOB的格式存储在底层数据库中。存储会在数据库中创建名为ENTRIES的表来存储数据,表具有key和val两个字段。 如果提供了定制的DDL和DML语句,表和字段的名字要和所有的语句一致以及参数的顺序也要保留。 使用CacheJdbcBlobStoreFactory工厂来向CacheConfiguration传入CacheJdbcBlobStore:

Spring:

<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
   <property name="cacheConfiguration">
     <list>
       <bean class="org.apache.ignite.configuration.CacheConfiguration">
         ...
           <property name="cacheStoreFactory">
             <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcBlobStoreFactory">
               <property name="user" value = "user" />
               <property name="dataSourceBean" value = "simpleDataSource" />
             </bean>
           </property>
       </bean>
      </list>
    </property>
  ...
</bean>

3.12.8.CacheJdbcPojoStore

CacheJdbcPojoStore实现基于JDBC和基于反射的POJO,这个实现将对象用基于反射的Java Bean映射描述的形式存储在底层数据库中。 使用CacheJdbcPojoStoreFactory工厂来向CacheConfiguration传入CacheJdbcPojoStore: Spring:

<bean id= "simpleDataSource" class="org.h2.jdbcx.JdbcDataSource"/>

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          ...
            <property name="cacheStoreFactory">
              <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
                <property name="dataSourceBean" value = "simpleDataSource" />
              </bean>
            </property>
        </bean>
      </list>
    </property>
</bean>

3.12.9.CacheHibernateBlobStore

CacheHibernateBlobStore实现基于Hibernate,这个实现将对象以BLOB的格式存储在底层数据库中。 使用CacheHibernateBlobStoreFactory工厂来向CacheConfiguration传入CacheHibernateBlobStore: Spring:

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
  ...
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <bean class="org.apache.ignite.cache.store.hibernate.CacheHibernateBlobStoreFactory">
           <property name="hibernateProperties">
             <props>
               <prop key="connection.url">jdbc:h2:mem:</prop>
               <prop key="hbm2ddl.auto">update</prop>
               <prop key="show_sql">true</prop>
             </props>
           </property>
         </bean>
       </list>
    </property>
  ...    
</bean>

3.13.自动持久化

Ignite附带了自己的数据库模式映射向导,他提供了与持久化存储集成的自动支持。这个工具自动连接底层数据库然后生成所有必要的XML映射文件还有Java领域模型对象POJOs。 Ignite还附带了org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore,这是即开即用的gniteCacheStore接口的JDBC实现,然后自动地处理通读和通写逻辑。

3.13.1.数据库模式导入

要启动生成数据库模式映射向导,可以执行bin/ignite-schema-import.sh脚本:

$ bin/ignite-schema-import.sh

这个命令会弹出一个界面向导。

数据库连接 第一个界面需要填入数据连接配置信息,注意工具没有提供JDBC驱动需要单独提供。

生成XML配置和POJOs 第二个界面可以将数据库表映射为领域模型类,然后自动生成XML配置和POJOs。

生成代码 工具会生成如下的代码:

  • Java POJO key类和value类
  • XMLCacheTypeMetadata配置
  • ConfigurationSnippet.java(与XML二选一)

从向导退出后,需要做:

  1. 将生成的代码目录拷贝进工程的源代码目录;
  2. CacheTypeMetadata部分的XML配置声明拷贝进Ignite的XML配置文件的CacheConfiguration项目下;
  3. 在Ignite初始化逻辑中使用ConfigurationSnippet.java.

3.13.2.CacheJdbcPojoStore

在通过向导生成XML和POJO代码以及将所有必要的代码片段拷贝进项目之后,就可以用CacheJdbcPojoStore来从数据库往缓存内加载数据以及从缓存将数据写入数据库。 首先需要在配置中正确地声明存储:

XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
 <!-- Cache configuration. -->
 <property name="cacheConfiguration">
  <list>
   <bean class="org.apache.ignite.configuration.CacheConfiguration">
    ...
    <!-- Cache store. -->
    <property name="cacheStoreFactory">
     <bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory">
      <constructor-arg>
       <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore">
        <property name="dataSource">
         <!-- Sample datasource: in-memory H2 database -->
         <property name="dataSource">
          <bean class="org.h2.jdbcx.JdbcConnectionPool" factory-method="create">
           <constructor-arg value="jdbc:h2:tcp://localhost/mem:ExampleDb"/>
           <constructor-arg value="sa"/>
           <constructor-arg value=""/>
          </bean>
         </property>
        </property>
       </bean>
      </constructor-arg>
     </bean>
    </property>

     <!-- Cache types metadata. -->
    <property name="typeMetadata">
     <list>
     ... Copy here types generated by wizard...
     </list>
    </property>

    <!-- Enable store usage. --> 
    <!-- Sets flag indicating whether read from database is enabled. -->
    <property name="readThrough" value="true"/>

    <!-- Sets flag indicating whether write to database is enabled. -->
    <property name="writeThrough" value="true"/>

    <!-- Enable database batching. --> 
    <!-- Sets flag indicating whether write-behind is enabled. -->
    <property name="writeBehindEnabled" value="true"/>
  </list>
 </property>
...
</bean>

Java:

IgniteConfiguration cfg = new IgniteConfiguration();
...
CacheConfiguration ccfg = new CacheConfiguration<>();

// Create data source for your database.
// For example: in-memory H2 database.
DataSource dataSource = org.h2.jdbcx.JdbcConnectionPool.create("jdbc:h2:tcp://localhost/mem:ExampleDb", "sa", "");

// Create store. 
CacheJdbcPojoStore store = new CacheJdbcPojoStore();
store.setDataSource(dataSource);

// Create store factory. 
ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<>(store));

// Configure cache to use store. 
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);

// Enable database batching.
ccfg.setWriteBehindEnabled(true);

cfg.setCacheConfiguration(ccfg);

// Configure cache types. 
Collection<CacheTypeMetadata> meta = new ArrayList<>();
... Paste here code generated by wizard ...
...
// Start Ignite node.
Ignition.start(cfg);

3.12.持久化存储中定义的所有操作在CacheJdbcPojoStore中都有效。 CacheJdbcPojoStore可以有效地加载数据,要从数据库中加载缓存配置中注册的所有类型的数据,只需要调用IgniteCache.loadCache(null)即可。要加载特定条件的数据,需要向IgniteCache.loadCache()中传入键类型以及SQL查询,比如:

IgniteCache<Long, Person> c = node.jcache(CACHE_NAME);
c.loadCache(null, "java.lang.Integer", "select * from Person where id > 100");

3.13.3.示例

示例org.apache.ignite.examples.datagrid.store.auto.CacheAutoStoreExample演示了缓存存储的使用。 要运行自动持久化的例子,需要设置CacheNodeWithStoreStartup.STORE = AUTO以及运行CacheAutoStoreExample

3.13.4.演示

下面是一步一步地演示,可以从examples/schema-import文件夹中找到演示的源代码。 演示要使用H2数据库

  • 用如下的脚本启动H2数据库服务:
    $  examples/schema-import/bin/h2-server.sh
    
  • 通过浏览器打开H2的控制台,按照下图所示连接入H2:
  • examples/schema-import/bin/db-init.sql的内容粘贴进H2控制台然后执行;
  • 通过演示的配置属性开启模式导入向导。

    bin/ignite-schema-import.sh examples/schema-import/bin/schema-import.properties
    

  • 点击下一步:

  • 点击Generate,然后回答Yes来覆盖警告。

POJO文件以及java片段会在examples/schema-import/src/main/java/org/apache/ignite/schema文件夹中生成。

  • examples/schema-import/pom.xml导入喜欢的IDE
  • 运行Demo.java文件。

3.14.数据加载

数据加载通常用于启动时初始化缓存数据,用标准的缓存put(...)putAll(...)操作通常加载大量的数据是比较低效的。

3.14.1.IgniteDataStreamer

数据流处理器是通过IgniteDataStreamerAPI定义的,他可以将大量的连续数据注入Ignite缓存。数据流处理器以可扩展和容错的方式在数据被发送到集群节点之前通过把定量数据放在一起以获得高性能。

数据流处理器可以用于任何时候将大量数据载入缓存,包括启动时的预加载。

想了解更多信息请参照4.2.数据流处理器

3.14.2.IgniteCache.loadCache()

将大量数据载入缓存的另一个方法是通过CacheStore.loadCache()方法,他可以在不传入要加载的所有键的情况下进行缓存的数据加载。 在所有保存该缓存的每一个集群节点上IgniteCache.loadCache()方法会委托给CacheStore.loadCache()方法,如果只想在本地节点上加载,可以用IgniteCache.localLoadCache()方法。

对于分区缓存,如果键没有映射到某个节点,不管是主节点还是备份节点,都会被自动忽略。

下面是一个CacheStore.loadCache()实现的例子,对于CacheStore的完整例子,可以参照3.12.持久化存储章节。

public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  ...
  // This mehtod is called whenever "IgniteCache.loadCache()" or
  // "IgniteCache.localLoadCache()" methods are called.
  @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
    if (args == null || args.length == 0 || args[0] == null)
      throw new CacheLoaderException("Expected entry count parameter is not provided.");

    final int entryCnt = (Integer)args[0];

    Connection conn = null;

    try (Connection conn = connection()) {
      try (PreparedStatement st = conn.prepareStatement("select * from PERSONS")) {
        try (ResultSet rs = st.executeQuery()) {
          int cnt = 0;

          while (cnt < entryCnt && rs.next()) {
            Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

            clo.apply(person.getId(), person);

            cnt++;
          }
        }
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load values from cache store.", e);
    }
  }
  ...
}

数据加载的分区感知 在上面描述的场景中同样的查询会在所有节点上执行,每个节点会迭代所有的结果集,忽略掉不属于该节点的所有键,效率不是很高。 如果数据库中的每条记录都保存分区ID的话这个情况会有所改善。可以通过org.apache.ignite.cache.affinity.Affinity接口来获得要存储在缓存中的任何键的分区ID。 下面的代码片段可以获得每个要存储在缓存中的Person对象的分区ID。

IgniteCache cache = ignite.cache(cacheName);
Affinity aff = ignite.affinity(cacheName);

for (int personId = 0; personId < PERSONS_CNT; personId++) {
    // Get partition ID for the key under which person is stored in cache.
    int partId = aff.partition(personId);

    Person person = new Person(personId);
    person.setPartitionId(partId);
    // Fill other fields.

    cache.put(personId, person);
}

当Person对象知道自己的分区ID,每个节点就可以只查询属于自己所属分区的数据。要做到这一点,可以将一个Ignite实例注入到自己的CacheStore,然后用它来确定本地节点所属的分区。 下面的代码片段演示了用Affinity来只加载本地分区的数据,注意例子代码是单线程的,然而它可以通过分区ID高效地并行化。

public class CacheJdbcPersonStore extends CacheStoreAdapter<Long, Person> {
  // Will be automatically injected.
  @IgniteInstanceResource
  private Ignite ignite;

  ...
  // This mehtod is called whenever "IgniteCache.loadCache()" or
  // "IgniteCache.localLoadCache()" methods are called.
  @Override public void loadCache(IgniteBiInClosure<Long, Person> clo, Object... args) {
    Affinity aff = ignite.affinity(cacheName);
    ClusterNode locNode = ignite.cluster().localNode();

    try (Connection conn = connection()) {
      for (int part : aff.primaryPartitions(locNode))
        loadPartition(conn, part, clo);

      for (int part : aff.backupPartitions(locNode))
        loadPartition(conn, part, clo);
    }
  }

  private void loadPartition(Connection conn, int part, IgniteBiInClosure<Long, Person> clo) {
    try (PreparedStatement st = conn.prepareStatement("select * from PERSONS where partId=?")) {
      st.setInt(1, part);

      try (ResultSet rs = st.executeQuery()) {
        while (rs.next()) {
          Person person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3));

          clo.apply(person.getId(), person);
        }
      }
    }
    catch (SQLException e) {
      throw new CacheLoaderException("Failed to load values from cache store.", e);
    }
  }
  ...
}

注意键和分区的映射依赖于affinity函数中配置的分区数量(参照org.apache.ignite.cache.affinity.AffinityFunctio)。如果affinity函数配置改变,数据库中存储的分区ID必须响应地更新。

3.15.退出策略

退出策略控制着堆内内存中缓存可以存储的元素的最大值,如果可能的话当达到堆内缓存的最大值时,数据会被退出到堆外空间。 部分退出策略支持批量退出以及受到内存大小限制而退出。 如果批量退出启用,那么当缓存大小加上batchSize数量大于最大缓存大小时退出就开始了,这时batchSize数量的元素会被退出。 如果因为内存大小限制而退出启用,那么当以字节数计算的缓存数据大小超过内存大小最大值时退出就会发生。

批量退出只有当最大内存限制没有赋值时才会被支持。

Ignite中退出策略是可插拔的,通过EvictionPolicy接口来控制。一个退出策略的实现会在每个缓存数据发生变化时收到通知,他定义了从缓存中选择要退出数据的算法。

如果数据集可以装入内存,那么退出策略不会带来任何好处,因此可以关闭,这也是默认值。

3.15.1.最近最少使用(LRU)

LRU退出策略基于最近最少使用算法,他会确保最近最少使用的数据(即最久没有被访问的数据)会被首先退出。 支持批量退出以及受到内存大小限制的退出。

LRU退出策略适用于缓存的大多数使用场景。

这个策略通过LruEvictionPolicy实现,通过CacheConfiguration进行配置。

XML:

<bean class="org.apache.ignite.cache.CacheConfiguration">
  <property name="name" value="myCache"/>
    ...
    <property name="evictionPolicy">
        <!-- LRU eviction policy. -->
        <bean class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
            <!-- Set the maximum cache size to 1 million (default is 100,000). -->
            <property name="maxSize" value="1000000"/>
        </bean>
    </property>
    ...
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

// Set the maximum cache size to 1 million (default is 100,000).
cacheCfg.setEvictionPolicy(new LruEvictionPolicy(1000000));

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.15.2.先进先出(FIFO)

FIFO退出策略基于先进先出算法,他确保缓存中保存时间最久的数据会被首先退出,他与ruEvictionPolicy不同,因为他忽略了数据的访问顺序。 支持批量退出以及受到内存大小限制的退出。 这个策略通过FifoEvictionPolicy实现,通过CacheConfiguration进行配置。

XML:

<bean class="org.apache.ignite.cache.CacheConfiguration">
  <property name="name" value="myCache"/>
    ...
    <property name="evictionPolicy">
        <!-- FIFO eviction policy. -->
        <bean class="org.apache.ignite.cache.eviction.fifo.FifoEvictionPolicy">
            <!-- Set the maximum cache size to 1 million (default is 100,000). -->
            <property name="maxSize" value="1000000"/>
        </bean>
    </property>
    ...
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

// Set the maximum cache size to 1 million (default is 100,000).
cacheCfg.setEvictionPolicy(new FifoEvictionPolicy(1000000));

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.15.3.有序

有序退出策略和FIFO退出策略很像,不同点在于通过默认或者用户定义的比较器定义了数据的顺序,然后确保最小的数据(即排序数值最小的数据)会被退出。 默认的比较器用缓存条目的键作为比较器,他要求键必须实现Comparable接口。也可以提供自定义的比较器实现,可以通过键,值或者两者都用来进行条目的比较。 支持批量退出以及受到内存大小限制的退出。 这个策略通过SortedEvictionPolicy实现,通过CacheConfiguration进行配置。

XML:

<bean class="org.apache.ignite.cache.CacheConfiguration">
  <property name="name" value="myCache"/>
    ...
    <property name="evictionPolicy">
        <!-- Sorted eviction policy. -->
        <bean class="org.apache.ignite.cache.eviction.sorted.SortedEvictionPolicy">
            <!-- Set the maximum cache size to 1 million (default is 100,000) and use default comparator. -->
            <property name="maxSize" value="1000000"/>
        </bean>
    </property>
    ...
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

// Set the maximum cache size to 1 million (default is 100,000).
cacheCfg.setEvictionPolicy(new SortedEvictionPolicy(1000000));

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.15.4.随机

随机退出策略会随机地选择条目退出,这个退出策略主要用于调试或者基准测试的目的。 这个策略通过RandomEvictionPolicy实现,通过CacheConfiguration进行配置。

XML:

<bean class="org.apache.ignite.cache.CacheConfiguration">
  <property name="name" value="myCache"/>
    ...
    <property name="evictionPolicy">
        <!-- Random eviction policy. -->
        <bean class="org.apache.ignite.cache.eviction.random.RandomEvictionPolicy">            <!-- Set the maximum cache size to 1 million (default is 100,000). -->
            <property name="maxSize" value="1000000"/>
        </bean>
    </property>
    ...
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setName("cacheName");

// Set the maximum cache size to 1 million (default is 100,000).
cacheCfg.setEvictionPolicy(new RandomEvictionPolicy(1000000));

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.16.过期策略

过期策略指定了在缓存条目过期之前必须经过的时间量,时间可以从创建,最后访问或者修改时间开始计算。 过期策略可以通过任何预定义的ExpiryPolicy实现进行设置。

类名 创建时间 最后访问时间 最后更新时间
CreatedExpiryPolicy 可用
AccessedExpiryPolicy 可用 可用
ModifiedExpiryPolicy 可用 可用
TouchedExpiryPolicy 可用 可用 可用
EternalExpiryPolicy

也可以自定义ExpiryPolicy实现。 过期策略可以在CacheConfiguration中进行设置,这个策略可以用于缓存内的所有条目。

IgniteCache<Object, Object> cache = cache.withExpiryPolicy(
    new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 5)));

也可以在对缓存进行单独操作时对过期策略进行设置或者修改。

IgniteCache<Object, Object> cache = cache.withExpiryPolicy(
    new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 5)));

此策略将用于在返回的缓存实例上调用的每个操作。

3.17.数据再平衡

当一个新节点加入集群时,已有节点会放弃一部分缓存条目的所有权转交给新的节点,以使整个网格在任何时候都保持键的均等平衡。 如果新的节点成为一些分区的主节点或者备份节点,他会从该分区之前的主节点获取数据,或者从该分区的备份节点之一获取数据。一旦分区全部载入新的节点,旧节点就会被标记为过时然后该节点在所有当前的事务完成之后最终会被退出。因此,在一些很短的时间段,在网络发生变化之后,有一种情况是在缓存中对于一个键备份的数量可能比事先配置的多。然而,一旦再平衡完成,额外的备份会被删除。

3.17.1.再平衡模式

下面的再平衡模式是在CacheRebalanceMode枚举中定义的:

缓存再平衡模式 描述
SYNC 同步再平衡模式,直到所有必要的数据全部从其他有效节点加载完毕分布式缓存才会启动,这意味着所有对缓存的开放API的调用都会阻塞直到再平衡结束
ASYNC 异步平衡模式,分布式缓存会马上启动,然后在后台会从其他节点加载所有必要的数据
NONE 该模式下不会发生再平衡,这意味着要么在访问数据时从持久化存储载入,要么数据被显式地填充。

默认启用ASYNC再平衡模式,要使用其他的再平衡模式,可以像下面这样设置CacheConfigurationrebalanceMode属性: XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">             
            <!-- Set synchronous rebalancing. -->
            <property name="rebalanceMode" value="SYNC"/>
            ... 
        </bean
    </property>
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.17.2.再平衡消息调节

当再平衡器将数据从一个节点传输到另一个节点时,他会将整个数据集拆分为多个批次然后将每一个批次作为一个单独的消息进行发送。如果数据集很大的话那么就会有很多的消息要发送,CPU和网络就会过度的消耗,这时在再平衡消息之间进行等待是合理的,以使由于再平衡过程导致的性能下降冲击最小化。这个时间间隔可以通过CacheConfigurationrebalanceThrottle属性进行控制,他的默认值是0,意味着在消息之间没有暂停,注意单个消息的大小也可以通过rebalanceBatchSize属性进行设置(默认值是512K)。 比如,如果希望再平衡器间隔100ms每个消息发送2MB数据,需要提供如下的配置。 XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="cacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">             
            <!-- Set batch size. -->
            <property name="rebalanceBatchSize" value="#{2 * 1024 * 1024}"/>

            <!-- Set throttle interval. -->
            <property name="rebalanceThrottle" value="100"/>
            ... 
        </bean
    </property>
</bean>

Java:

CacheConfiguration cacheCfg = new CacheConfiguration();

cacheCfg.setRebalanceBatchSize(2 * 1024 * 1024);

cacheCfg.setRebalanceThrottle(100);

IgniteConfiguration cfg = new IgniteConfiguration();

cfg.setCacheConfiguration(cacheCfg);

// Start Ignite node.
Ignition.start(cfg);

3.17.3.配置

缓存的再平衡行为可以有选择地通过下面的配置属性进行定制:

setter方法 描述 默认值
setRebalanceMode 分布式缓存的再平衡模式,细节可以参照再平衡模式章节 ASYNC
setRebalancePartitionedDelay 再平衡延迟时间,以毫秒为单位 0,无延迟
setRebalanceBatchSize 再平衡消息的大小,以byte为单位,再平衡算法会在发送数据之前将每个节点的整个数据集拆分成多个批次。 512K
setRebalanceThreadPoolSize 再平衡线程池的大小,注意这个大小只是一个提示,实现可能会为再平衡创建比这里指定的更多的线程(但从来不是更少的线程) 2
etRebalanceThrottle 再平衡消息之间的等待时间(毫秒),用来避免CPU和网络的过载,注意应用在再平衡的过程中仍然可以正确执行 0,无间隔
setRebalanceOrder 要完成的再平衡的顺序,只有同步和异步再平衡模式的缓存才可以将再平衡顺序设置为非0值,具有更小值的缓存再平衡会被首先完成,再平衡默认是无序的 0
setRebalanceTimeout 再平衡超时时间(毫秒) 10000

3.18.Web会话集群化

Ignite内存数据组织具有缓存所有兼容Java Servlet3.0规范的Java Servlet容器的Web Session的能力。包括Apache Tomcat,Eclipse Jetty,Oracle WebLogic以及其他的。 缓存Web会话对于运行一个应用服务器集群时是有用的。当在一个Servlet容器中运行一个Web应用时,可能面临性能和可扩展性的问题,一个单独的应用服务器通常可能无法自己处理很大的流量,一个常规的解决方案就是跨越多个集群实例扩展Web应用。 在上面的架构中,高可用代理(负载平衡器)在多个应用服务器实例之间分发请求(应用服务器1,应用服务器2……),来降低每个实例的负载以及提供在任意实例故障时的服务可用性,这里的问题就是Web会话的可用性。Web会话通过Cookie保持请求之间的中间逻辑状态,并且通常绑定到一个特定的应用实例。通常这是由粘性连接来处理,来确保来自同一个用户的请求被同一个应用服务器实例处理。然而,如果该实例故障,会话就丢失了,所有当前未保存的状态也丢失了,然后用户会重新创建它。 这里的一个解决方案就是用Ignite来缓存Web会话-维护每个创建的会话的拷贝的分布式缓存,在所有的实例中共享他们。如果任何一个应用实例故障,Ignite会马上恢复故障实例所属的会话,这样的话,随着Web会话被缓存粘性连接就变得不那么重要,因为会话可以用于请求被路由到的任何应用服务器。 这个章节给出了一个Ignite的Web会话缓存功能的主要架构概况以及介绍了如何配置Web应用来启用Web会话缓存。

3.18.1.架构

要用Ignite配置一个分布式Web会话缓存,通常需要将应用启动为一个Ignite节点(嵌入式模式),当多个应用服务器实例启动后,所有的Ignite节点会连姐在一起形成一个分布式缓存。

注意并不是所有的Ignite缓存节点都需要运行在应用服务器内部,也可以启动额外的,独立的Ignite节点,然后将他们加入集群。

3.18.2.复制策略

当将会话存储在Ignite中时有几个复制策略可供选择,本章节将主要覆盖最常用的配置。 全复制缓存 这个策略保存每个Ignite节点上的所有会话的拷贝,提供了最大的可用性。然而这个方法缓存的会话的数量必须匹配单个服务器的内存大小,另外,性能也会变差因为现在Web会话状态的每一次改变都必须复制到集群中所有的其他节点。 要启用全复制策略,设置缓存的cacheMode为REPLICATED

<bean class="org.apache.ignite.configuration.CacheConfiguration">
    <!-- Cache mode. -->
    <property name="cacheMode" value="REPLICATED"/>
    ...
</bean>

有备份的分区缓存 在分区模式,Web会话会被拆分为区,每个节点只负责缓存分配给该节点的分区的数据,这个方法中如果有更多的节点,就可以缓存更多的数据,新的节点也可以加入以在运行中增加更多的内存。

分区模式中,冗余是通过为每个缓存的Web会话配置一定数量的备份实现的。

要开启分区策略,设置缓存的cacheMode为PARTITIONED以及通过CacheConfigurationbackups属性来配置备份的数量。

<bean class="org.apache.ignite.configuration.CacheConfiguration">
    <!-- Cache mode. -->
    <property name="cacheMode" value="PARTITIONED"/>
    <property name="backups" value="1"/>
</bean>

可以参照3.3.缓存模型来了解关于Ignite不同复制策略的更多信息。

3.18.3.过期和退出

当会话过期后会被缓存自动清理。然而,如果创建了大量的长期活动的会话,当缓存达到一个特定的限值时,为了节省内存可能需要将不必要的缓存退出。这个可以通过设置缓存的退出策略以及指定缓存中可以存储的会话的最大值来实现。比如,要启用基于LRU算法的自动退出以及10000个会话的限制,可以通过如下的缓存配置来实现:

<bean class="org.apache.ignite.configuration.CacheConfiguration">
    <!-- Cache name. -->
    <property name="name" value="session-cache"/>

    <!-- Set up LRU eviction policy with 10000 sessions limit. -->
    <property name="evictionPolicy">
        <bean class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
            <property name="maxSize" value="10000"/>
        </bean>
    </property>
    ...
</bean>

要了解各个退出策略的更多信息,可以参照3.15.退出策略章节。

3.18.4.配置

要在应用中通过Ignite开启Web会话缓存,需要:

  1. 添加Ignite的jar包,下载Ignite然后将如下的jar包加入应用的类路径(WEB-INF/lib目录);

    • ignite-core.jar
    • cache-api-1.0.0.jar
    • ignite-web.jar
    • ignite-log4j.jar
    • ignite-spring.jar

或者,如果是一个基于Maven的工程,可以将下面的片段加入应用的pom.xmlz红:

<dependency>
      <groupId>org.apache.ignite</groupId>
      <artifactId>ignite-core</artifactId>
      <version> ${ignite.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-web</artifactId>
    <version> ${ignite.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-log4j</artifactId>
    <version>${ignite.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-spring</artifactId>
    <version>${ignite.version}</version>
</dependency>

确保将${ignite.version}替换为实际的Ignite版本。

  1. 配置缓存模式,配置Ignite的缓存,要么是分区模式,要么是复制模式(可以看上面的例子);
  2. 更新web.xml,在web.xml中声明一个ContextListener和一个WebSessionsFilter:
 ...
<listener>
   <listener-class>org.apache.ignite.startup.servlet.ServletContextListenerStartup</listener-class>
</listener>

<filter>
   <filter-name>IgniteWebSessionsFilter</filter-name>
   <filter-class>org.apache.ignite.cache.websession.WebSessionFilter</filter-class>
</filter>

<!-- You can also specify a custom URL pattern. -->
<filter-mapping>
   <filter-name>IgniteWebSessionsFilter</filter-name>
   <url-pattern>/*</url-pattern>
</filter-mapping>

<!-- Specify Ignite configuration (relative to META-INF folder or Ignite_HOME). -->
<context-param>
   <param-name>IgniteConfigurationFilePath</param-name>
   <param-value>config/default-config.xml </param-value>
</context-param>

<!-- Specify the name of Ignite cache for web sessions. -->
<context-param>
   <param-name>IgniteWebSessionsCacheName</param-name>
   <param-value>partitioned</param-value>
</context-param>
...

在应用启动时,监听器会在应用中启动一个Ignite节点,他会连接网络中的其他节点以形成一个分布式缓存。

  1. 设置退出策略(可选),为缓存中的旧数据设置退出策略(可以看上面的例子)。

配置参数 ServletContextListenerStartup有如下的配置参数: |参数名|描述|默认值| |---|---|---| |IgniteConfigurationFilePath|Ignite配置文件的路径(相对于META-INF文件夹或者IGNITE_HOME)|/config/default-config.xml| WebSessionFilter有如下的配置参数: |参数名|描述|默认值| |---|---|---| |IgniteWebSessionsGridName|启动Ignite节点的网格名,可以参照配置文件的grid部分(如果配置文件中指定了网格名)|无| |IgniteWebSessionsCacheName|Web会话缓存的缓存名|无| |IgniteWebSessionsMaximumRetriesOnFail|只对ATOMIC缓存有效,指定了当主节点故障时的重试次数|3|

3.18.5.支持的应用服务器

Ignite官方测试了如下的应用服务器:

  • Apache Tomcat 7
  • Eclipse Jetty 9
  • Apache Tomcat 6
  • Oracle WebLogic >= 10.3.4

3.19.Hibernate二级缓存

Ignite可以用做Hibernate的二级缓存,它可以显著地提升应用持久化层的性能。 Hibernate是著名的、应用广泛的对象关系映射框架(ORM),在与SQL数据库紧密互动的同时,他通过对查询结果集的缓存来最小化昂贵的数据库请求。 Hibernate数据库映射对象的所有工作都是在一个会话中完成的,通常绑定到一个worker线程或者Web会话。默认的话,Hibernate只会使用Session级的缓存(L1),因此,缓存在一个会话中的对象,对于另一个会话是不可见的。然而,如果用一个全局的二级缓存的话,他缓存的所有对象对于用同一个缓存配置的所有会话都是可见的。这通常会带来性能的显著提升,因为每一个新创建的会话都可以利用L2缓存(他比任何会话级L1缓存都要长寿)中已有的数据的好处。 L1缓存是一直启用的而且是由Hibernate内部实现的,而L2缓存是可选的而且有很多的可插拔的实现。Ignite可以作为L2缓存的实现非常容易地嵌入,而且可以用于所有的访问模式(READ_ONLY,READ_WRITE,NONSTRICT_READ_WRITETRANSACTIONAL),支持广泛的相关特性:

  • 缓存到内存和磁盘以及堆外内存
  • 缓存事务
  • 集群,有2种不同的复制模式,复制分区

如果要将Ignite作为Hibernate的二级缓存,需要简单的三个步骤:

  • 将Ignite的库文件添加到应用的类路径;
  • 启用二级缓存以及在二级缓存的配置文件中指定Ignite的实现类;
  • 为二级缓存配置Ignite缓存区域以及启动嵌入式的Ignite节点(也可以选择外部的节点)。

本章节的后面会详细介绍这些步骤的细节。

3.19.1.二级缓存配置

要将Ignite配置为Hibernate的二级缓存,不需要修改已有的Hibernate代码,只需要:

  • 添加hibernate-ignite模块的依赖;
  • 配置Hibernate使用Ignite作为二级缓存
  • 适当地配置Ignite缓存

Maven配置

Maven依赖 为了开启Ignite的hibernate集成,需要在工程里面添加ignite-hibernate的依赖,或者在从命令行启动之前,从libs/optional中将ignite-hibernate模块拷贝到libs文件夹。

要在项目中添加Ignite-hibernate集成,需要将下面的依赖加入POM文件:

<dependency>
  <groupId>org.apache.ignite</groupId>
  <artifactId>ignite-hibernate</artifactId>
  <version>RELEASE</version>
</dependency>

Hibernate配置示例 一个用Ignite配置Hibernate二级缓存的典型例子看上去像下面这样:

<hibernate-configuration>
    <session-factory>
        ...
        <!-- Enable L2 cache. -->
        <property name="cache.use_second_level_cache">true</property>

        <!-- Generate L2 cache statistics. -->
        <property name="generate_statistics">true</property>

        <!-- Specify GridGain as L2 cache provider. -->
        <property name="cache.region.factory_class">org.apache.ignite.cache.hibernate.HibernateRegionFactory</property>

        <!-- Specify the name of the grid, that will be used for second level caching. -->
        <property name="org.apache.ignite.hibernate.grid_name">hibernate-grid</property>

        <!-- Set default L2 cache access type. -->
        <property name="org.apache.ignite.hibernate.default_access_type">READ_ONLY</property>

        <!-- Specify the entity classes for mapping. -->
        <mapping class="com.mycompany.MyEntity1"/>
        <mapping class="com.mycompany.MyEntity2"/>

        <!-- Per-class L2 cache settings. -->
        <class-cache class="com.mycompany.MyEntity1" usage="read-only"/>
        <class-cache class="com.mycompany.MyEntity2" usage="read-only"/>
        <collection-cache collection="com.mycompany.MyEntity1.children" usage="read-only"/>
        ...
    </session-factory>
</hibernate-configuration>

这里,我们做了如下工作:

  • 开启了二级缓存(可选地生成二级缓存的统计)
  • 指定Ignite作为二级缓存的实现
  • 指定缓存网格的名字(需要和Ignite配置文件中的保持一致)
  • 指定实体类以及为每个类配置缓存(Ignite中应该配置一个相应的缓存区域)

Ignite配置示例 一个典型的支持Hibernate二级缓存的Ignite配置,像下面这样:

<!-- Basic configuration for atomic cache. -->
<bean id="atomic-cache" class="org.apache.ignite.configutation.CacheConfiguration" abstract="true">
    <property name="cacheMode" value="PARTITIONED"/>
    <property name="atomicityMode" value="ATOMIC"/>
    <property name="writeSynchronizationMode" value="FULL_SYNC"/>
</bean>

<!-- Basic configuration for transactional cache. -->
<bean id="transactional-cache" class="org.apache.ignite.configutation.CacheConfiguration" abstract="true">
    <property name="cacheMode" value="PARTITIONED"/>
    <property name="atomicityMode" value="TRANSACTIONAL"/>
    <property name="writeSynchronizationMode" value="FULL_SYNC"/>
</bean>

<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
    <!-- 
        Specify the name of the caching grid (should correspond to the 
        one in Hibernate configuration).
    -->
    <property name="gridName" value="hibernate-grid"/>
    ...
    <!-- 
        Specify cache configuration for each L2 cache region (which corresponds 
        to a full class name or a full association name).
    -->
    <property name="cacheConfiguration">
        <list>
            <!--
                Configurations for entity caches.
            -->
            <bean parent="transactional-cache">
                <property name="name" value="com.mycompany.MyEntity1"/>
            </bean>
            <bean parent="transactional-cache">
                <property name="name" value="com.mycompany.MyEntity2"/>
            </bean>
            <bean parent="transactional-cache">
                <property name="name" value="com.mycompany.MyEntity1.children"/>
            </bean>

            <!-- Configuration for update timestamps cache. -->
            <bean parent="atomic-cache">
                <property name="name" value="org.hibernate.cache.spi.UpdateTimestampsCache"/>
            </bean>

            <!-- Configuration for query result cache. -->
            <bean parent="atomic-cache">
                <property name="name" value="org.hibernate.cache.internal.StandardQueryCache"/>
            </bean>
        </list>
    </property>
    ...
</bean>

上面的代码为每个二级缓存区域指定了缓存的配置:

  • 使用分区缓存在缓存节点间拆分数据,其他的策略也可以选择复制模式,这样的话就在所有缓存节点上复制完整的数据集,可以参照相关的章节以了解更多的信息。
  • 指定与二级缓存区域名一致的缓存名(或者是完整类名或者是完整的关系名)
  • 事务原子化模式来利用缓存事务的优势
  • 开启FULL_SYNC模式保持备份节点的完全同步

另外,指定了一个缓存来更新时间戳,它可以是原子化的,因为性能好。 配置完Ignite缓存节点后,可以通过如下方式在节点内启动他:

Ignition.start("my-config-folder/my-ignite-configuration.xml");

上述代码执行完毕后,内部的节点就启动了然后准备缓存数据,也可以从控制台执行如下命令来启动额外的独立的节点:

$IGNITE_HOME/bin/ignite.sh my-config-folder/my-ignite-configuration.xml

对于Windows,可以执行同一文件夹下的.bat脚本。

节点也可以在其他主机上启动,以形成一个分布式的缓存集群,一定要确保在配置文件中指定正确的网络参数。

3.19.2.查询缓存

除了二级缓存,Hibernate还提供了查询缓存,这个缓存存储了通过指定参数集进行查询的结果(或者是HQL或者是Criteria),因此,当重复用同样的参数集进行查询时,他会命中缓存而不会去访问数据库。 查询缓存对于反复用同样的参数集进行查询时是有用的。像二级缓存的场景一样,Hibernate依赖于一个第三方的缓存实现,Ignite也可以这样用。

要考虑Ignite中对于基于SQL的内存内查询的支持,他会比通过Hibernate性能更好。

3.19.3.查询缓存配置

上面的配置信息完全适用于查询缓存,但是额外的配置和代码变更还是必要的。 Hibernate配置 要在Hibernate种启用查询缓存,只需要在配置文件中添加额外的一行:

<!-- Enable query cache. -->
<property name="cache.use_query_cache">true</property>

然后,需要对代码做出修改,对于要缓存的每一个查询,都需要通过调用setCacheable(true)来开启cacheable标志。


Session ses = ...;

// Create Criteria query.
Criteria criteria = ses.createCriteria(cls);

// Enable cacheable flag.
criteria.setCacheable(true);

...

这个完成之后,查询结果就会被缓存了。 Ignite配置 要在Ignite中开启Hibernate查询缓存,需要指定一个额外的缓存配置:

<property name="cacheConfiguration">
    <list>
        ...
        <!-- Query cache (refers to atomic cache defined in above example). -->
        <bean parent="atomic-cache">
            <property name="name" value="org.hibernate.cache.internal.StandardQueryCache"/>
        </bean>
    </list>
</property>

注意为了更好的性能缓存配置为原子化的。

3.20.JDBC驱动

Ignite提供了JDBC驱动,他可以通过标准SQL和JDBC API来从缓存中获得分布式数据。

3.20.1.JDBC连接

jdbc:ignite:cfg://[<params>@]<config_url>
  • <config_url>是必须的,表示指向Ignite配置文件的任意合法URL,要了解更多细节可以参照1.6.客户端和服务器章节。
  • <params>是可选的,格式如下:
param1=value1:param2=value2:...:paramN=valueN

他支持如下的参数:

属性 描述 默认值
cache 缓存名,如果未定义会使用默认的缓存,区分大小写
nodeId 要执行的查询所在节点的Id,对于在本地查询是有用的
local 查询只在本地节点执行,这个参数和nodeId参数都是通过指定节点来限制数据集 false
collocated 优化标志,当Ignite执行一个分布式查询时,他会向单个的集群节点发送子查询,如果提前知道要查询的数据已经被并置到同一个节点,Ignite会有显著的性能提升和网络优化 false

跨缓存查询 驱动连接到的缓存会被视为默认的模式,要跨越多个缓存进行查询,可以参照3.6.缓存查询章节。

关联和并置 就像3.6.缓存查询章节描述的那样,通过IgniteCacheAPI,如果关联对象是以并置模式存储的话,在分区缓存上的关联是可以正常执行的。细节可以参照3.11.关系并置章节。

复制和分区缓存复制缓存上的查询会直接在一个节点上执行,而在分区缓存上的查询是分布在所有缓存节点上的。

3.20.2.示例

Ignite JDBC驱动会自动地只获取缓存中存储的对象中实际需要的那些字段,比如,有一个像下面这样的Person对象。

public class Person {
    @QuerySqlField
    private String name;

    @QuerySqlField
    private int age;

    // Getters and setters.
    ...
}

如果在缓存中有这些类的实例,可以通过标准JDBC API 查询单独的字段(name,age或者两个),比如:

// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");

// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");

// Query names of all people.
ResultSet rs = conn.createStatement().executeQuery("select name from Person");

while (rs.next()) {
    String name = rs.getString(1);
    ...
}

// Query people with specific age using prepared statement.
PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");

stmt.setInt(1, 30);

ResultSet rs = stmt.executeQuery();

while (rs.next()) {
    String name = rs.getString("name");
    int age = rs.getInt("age");
    ...
}

3.20.3.后向兼容

对于之前版本的Ignite(1.4之前),JDBC连接的URL有如下的格式:

jdbc:ignite://<hostname>:<port>/<cache_name>

细节可以参照文档

3.21.Spring缓存

Ignite提供了一个SpringCacheManager-一个Spring缓存抽象的实现。他提供了基于注解的方式来启用Java方法的缓存,这样方法的执行结果就会存储在Ignite缓存中。如果之后同一个方法通过同样的参数集被调用,结果会直接从缓存中获得而不是实际执行这个方法。

Spring缓存抽象文档 关于如何使用Spring缓存抽象的更多信息,包括可用的注解,可以参照这个文档页面:http://docs.spring.io/spring/docs/current/spring-framework-reference/html/cache.html.

3.21.1.如何启用缓存

只需要两个简单的步骤就可以将Ignite缓存嵌入基于Spring的应用:

  • 在嵌入式模式中使用正确的配置文件启动一个Ignite节点(即应用运行的同一个JVM)。他也可以有预定义的缓存,但不是必须的-如果必要缓存会在第一次访问时自动创建。
  • 在Spring应用上下文中配置SpringCacheManager作为缓存管理器。

嵌入式节点可以通过SpringCacheManager自己启动,这种情况下需要分别通过configurationPath或者configuration属性提供一个Ignite配置文件的路径或者IgniteConfigurationBean(看下面的例子)。注意同时设置两个属性是非法的以及抛出IllegalArgumentException。 配置路径:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:cache="http://www.springframework.org/schema/cache"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/cache
         http://www.springframework.org/schema/cache/spring-cache.xsd">
    <!-- Provide configuration file path. -->
    <bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
        <property name="configurationPath" value="examples/config/spring-cache.xml"/>
    </bean>

    <!-- Enable annotation-driven caching. -->
    <cache:annotation-driven/>
</beans>

配置Bean:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:cache="http://www.springframework.org/schema/cache"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/cache
         http://www.springframework.org/schema/cache/spring-cache.xsd">
    <-- Provide configuration bean. -->
    <bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
        <property name="configuration">
            <bean class="org.apache.ignite.configuration.IgniteConfiguration">
                 ...
            </bean>
        </property>
    </bean>

    <-- Enable annotation-driven caching. -->
    <cache:annotation-driven/>
</beans>

当缓存管理器初始化时也可能已经有一个Ignite节点正在运行(比如已经通过ServletContextListenerStartup启动了)。这时只需要简单地通过gridName属性提供网格名字就可以了。注意如果不设置网格名字,缓存管理器会试图使用默认的Ignite实例(名字为空的),下面是一个例子: 使用已启动的Ignite实例:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:cache="http://www.springframework.org/schema/cache"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/cache
         http://www.springframework.org/schema/cache/spring-cache.xsd">
    <!-- Provide grid name. -->
    <bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
        <property name="gridName" value="myGrid"/>
    </bean>

    <!-- Enable annotation-driven caching. -->
    <cache:annotation-driven/>
</beans>

远程节点 注意应用内部启动的节点只是希望连接的网络的一个入口,可以根据需要通过Ignite发行版提供的bin/ignite.{sh|bat}脚本启动尽可能多的远程独立节点,所有这些节点都会参与缓存数据。

3.21.2.动态缓存

虽然通过Ignite配置文件可以获得所有必要的缓存,但是这不是必要的。如果Spring要使用一个不存在的缓存时,SpringCacheManager会自动创建它。 如果不指定,会使用默认值创建一个新的缓存。要定制的话,可以通过dynamicCacheConfiguration属性提供一个配置模板,比如,如果希望使用复制缓存而不是分区缓存,可以像下面这样配置SpringCacheManager:

<bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
    ...

    <property name="dynamicCacheConfiguration">
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
            <property name="cacheMode" value="REPLICATED"/>
        </bean>
    </property>
</bean>

也可以在客户端侧使用近缓存,要做到这一点只需要简单地通过dynamicNearCacheConfiguration属性提供一个近缓存配置即可。默认的话近缓存是不启用的,下面是一个例子:

<bean id="cacheManager" class="org.apache.ignite.cache.spring.SpringCacheManager">
    ...

    <property name="dynamicNearCacheConfiguration">
        <bean class="org.apache.ignite.configuration.NearCacheConfiguration">
            <property name="nearStartSize" value="1000"/>
        </bean>
    </property>
</bean>

3.21.3.示例

一旦在Spring应用上下文中加入了SpringCacheManager,就可以通过简单地加上注解为任意的java方法启用缓存。 通常为很重的操作使用缓存,比如数据库访问。比如,假设有个Dao类有一个averageSalary(...)方法,他计算一个组织内的所有雇员的平均工资,那么可以通过@Cacheable注解来开启这个方法的缓存。

private JdbcTemplate jdbc;

@Cacheable("averageSalary")
public long averageSalary(int organizationId) {
    String sql =
        "SELECT AVG(e.salary) " +
        "FROM Employee e " +
        "WHERE e.organizationId = ?";

    return jdbc.queryForObject(sql, Long.class, organizationId);
}

当这个方法第一次被调用时,SpringCacheManager会自动创建一个averageSalary缓存,他也会在缓存中查找事先计算好的平均值然后如果存在的话就会直接返回,如果这个组织的平均值还没有被计算过,那么这个方法就会被调用然后将结果保存在缓存中,因此下一次请求这个组织的平均值,就不需要访问数据库了。

缓存键 因为organizationId是唯一的方法参数,所以他会自动作为缓存键。

如果一个雇员的工资发生变化,可能希望从缓存中删除这个雇员所属组织的平均值,否则averageSalary(...)方法会返回过时的缓存结果。这个可以通过将@CacheEvict注解加到一个方法上来更新雇员的工资:

private JdbcTemplate jdbc;

@CacheEvict(value = "averageSalary", key = "#e.organizationId")
public void updateSalary(Employee e) {
    String sql =
        "UPDATE Employee " +
        "SET salary = ? " +
        "WHERE id = ?";

    jdbc.update(sql, e.getSalary(), e.getId());
}

在这个方法被调用之后,这个雇员所属组织的平均值就会被从averageSalary缓存中踢出,这会强迫averageSalary(...)方法在下一次调用时重新计算。

Spring表达式语言(SpEL) 注意这个方法是以雇员为参数的,而平均值是通过组织的Id将平均值存储在缓存中的。为了明确地指定什么作为缓存键,可以使用注解的key参数和Spring表达式语言#e.organizationId表达式的意思是从e变量中获取organizationId属性的值。本质上会在提供的雇员对象上调用getOrganizationId()方法,以及将返回的值作为缓存键。

3.22.拓扑验证

拓扑验证器用于验证集群网络拓扑对于未来的缓存操作是否有效。 拓扑验证器在每次集群拓扑发生变化时都会被调用(或者新节点加入或者已有节点故障或者其他的)。如果没有配置拓扑验证器,那么集群拓扑会被认为一直有效。 当TopologyValidator.validate(Collection)方法返回true时,那么对于特定的缓存以及在这个缓存上的所有有效操作拓扑都会被认为是有效的,否则,该缓存上的所有更新操作都会抛出如下异常:

  • CacheException:所有试图更新的操作都会抛出(put,remove等)
  • IgniteException:试图进行事务提交的操作会抛出

返回false以及声明拓扑无效后,当下一次拓扑发生变化时拓扑验证器可以返回正常状态。 示例:

...
for (CacheConfiguration cCfg : iCfg.getCacheConfiguration()) {
    if (cCfg.getName() != null) {
        if (cCfg.getName().equals(CACHE_NAME_1))
            cCfg.setTopologyValidator(new TopologyValidator() {
                @Override public boolean validate(Collection<ClusterNode> nodes) {
                    return nodes.size() == 2;
                }
            });
        else if (cCfg.getName().equals(CACHE_NAME_2))
            cCfg.setTopologyValidator(new TopologyValidator() {
                @Override public boolean validate(Collection<ClusterNode> nodes) {
                    return nodes.size() >= 2;
                }
            });
    }
}
...

在这个例子中,对缓存允许更新操作情况如下:

  • CACHE_NAME_1:集群具有两个节点时
  • CACHE_NAME_2:集群至少有两个节点时

配置 拓扑验证器通过CacheConfiguration.setTopologyValidator(TopologyValidator)方法既可以用代码也可以用XML进行配置。