标签:Apache-Ignite-1.4.0-中文开发手册

[TOC]

6.分布式数据结构

6.1.队列和集合

Ignite除了提供了标准的键-值的类似于Map的存储以外,也提供了一个快速的分布式阻塞队列和分布式集合的实现。 IgniteQueueIgniteSet分别是java.util.concurrent.BlockingQueuejava.util.Set接口的实现,也支持java.util.Collection接口的所有功能,这两个实现既可以以并置模式也可以以非并置模式创建。 下面是一个如何创建分布式队列和集合的例子: 队列:

Ignite ignite = Ignition.ignite();

IgniteQueue<String> queue = ignite.queue(
    "queueName", // Queue name.
    0,          // Queue capacity. 0 for unbounded queue.
    null         // Collection configuration.
);

集合:

Ignite ignite = Ignition.ignite();

IgniteSet<String> set = ignite.set(
    "setName", // Queue name.
    null       // Collection configuration.
);

6.1.1.并置和非并置模式

如果只打算创建包含大量数据的几个Queue或者Set,那么应该以非并置模式创建,这会确保每个集群节点存储每个队列或者集合大体均等的一部分。另一方面,如果打算持有很多的队列或者集合,而大小又相对较小(和整个缓存比),那么以并置模式创建他们是更合理的。这个模式下所有的队列和集合元素都会存储在同一个集群节点上,但是每个节点会被赋予均等的队列或者集合数量。 一个并置模式的队列或者集合可以通过CollectionConfigurationcollocated属性来创建,像下面这样: 队列:

Ignite ignite = Ignition.ignite();

CollectionConfiguration colCfg = new CollectionConfiguration();

colCfg.setCollocated(true); 

// Create collocated queue.
IgniteQueue<String> queue = ignite.queue("queueName", 0, colCfg);

集合:

Ignite ignite = Ignition.ignite();

CollectionConfiguration colCfg = new CollectionConfiguration();

colCfg.setCollocated(true); 

// Create collocated set.
IgniteSet<String> set = ignite.set("setName", colCfg);

非并置模式只对分区缓存才有意义,也只有分区缓存才支持。

6.1.2.有界队列

有界队列使得可以拥有很多的有最大容量的队列,这样可以更好地控制整体缓存的容量,他们既可以是并置的也可以是非并置的。当有界队列较小而且用于并置模式时,所有的队列操作变得很快。此外,当与计算网格一起用时,可以将计算作业配置在队列所在的集群节点上,这样可以确保所有的操作都是本地化的以及没有(或者最小化)数据分布化。 下面的代码显示了一个作业如何直接发给队列所在的节点:

Ignite ignite = Ignition.ignite();

CollectionConfiguration colCfg = new CollectionConfiguration();

colCfg.setCollocated(true); 

final IgniteQueue<String> queue = ignite.queue("queueName", 20, colCfg);

// Add queue elements (queue is cached on some node).
for (int i = 0; i < 20; i++)
    queue.add("Value " + Integer.toString(i));

IgniteRunnable queuePoller = new IgniteRunnable() {
    @Override public void run() throws IgniteException {
        // Poll is local operation due to collocation.
        for (int i = 0; i < 20; i++)
            System.out.println("Polled element: " + queue.poll());
    }
};

// Drain queue on the node where the queue is cached.
ignite.compute().affinityRun("cacheName", "queueName", queuePoller);

参照8.6.计算和数据的并置章节,可以了解计算和数据的并置的更多信息。

6.1.3.缓存队列和负载平衡

特定的元素会留在队列中直到被读取,以及没有两个节点会从队列中得到同一个元素。在Ignite中缓存队列会被用做一个备用的工作以及负载平衡的方式。 比如,可以简单地将一个计算,比如一个IgniteRunnable的实例加入队列,然后远程节点上有线程来调用IgniteQueue.take()方法,如果队列为空的话会阻塞,如果take()方法返回一个作业,一个线程会处理他然后再次调用take()方法来获取下一个作业。通过这个方式,远程节点的线程只有在前一个作业完成之后才会开启下一个作业,因此创建一个理想的平衡系统,即每个节点只领取他能处理的作业数量,而不是更多。

6.1.4.集合配置

Ignite的集合可以通过CollectionConfiguration的API进行配置(可以看上面的例子),可以选择下面的参数进行配置: |setter方法|描述|默认值| |---|---|---| |setCollocated(boolean)|设置并置模式|false|

6.2.原子类型

Ignite支持分布式的原子类型longreference,分别类似于java.util.concurrent.atomic.AtomicLongjava.util.concurrent.atomic.AtomicReference。 Ignite的原子性是跨集群分布式的,从根本上支持了对全局可见的数值的原子性操作(比如增量-获取或者比较-赋值)。比如,可以更新一个节点上的原子性的long类型值,然后从另一个节点读取。

特性一览

  • 获取当前值
  • 原子化修改当前值
  • 原子化地对当前值进行增量或者减量
  • 原子化地和新值进行比较以及设置新值

分布式原子化的long和reference可以分别通过IgniteAtomicLongIgniteAtomicReference获得,像下面这样: AtomicLong:

Ignite ignite = Ignition.ignite();

IgniteAtomicLong atomicLong = ignite.atomicLong(
    "atomicName", // Atomic long name.
    0,            // Initial value.
    false         // Create if it does not exist.
)

AtomicReference:

Ignite ignite = Ignition.ignite();

// Create an AtomicReference.
IgniteAtomicReference<Boolean> ref = ignite.atomicReference(
    "refName",  // Reference name.
    "someVal",  // Initial value for atomic reference.
    true        // Create if it does not exist.
);

下面是使用IgniteAtomicLongIgniteAtomicReference的示例: AtomicLong:

Ignite ignite = Ignition.ignite();

// Initialize atomic long.
final IgniteAtomicLong atomicLong = ignite.atomicLong("atomicName", 0, true);

// Increment atomic long on local node.
System.out.println("Incremented value: " + atomicLong.incrementAndGet());

AtomicReference:

Ignite ignite = Ignition.ignite();

// Initialize atomic reference.
IgniteAtomicReference<String> ref = ignite.atomicReference("refName", "someVal", true);

// Compare old value to new value and if they are equal,
//only then set the old value to new value.
ref.compareAndSet("WRONG EXPECTED VALUE", "someNewVal"); // Won't change.

通过IgniteAtomicLongIgniteAtomicReference提供的所有原子性操作都是同步的,一个原子性操作花费的时间依赖于与同一个原子性long类型的实例执行并发操作的节点数量,操作的强度以及网络的延时。

IgniteCache接口有putIfAbsent()replace()方法,他们和原子类型一样提供了同样的CAS功能。

6.2.1.原子化的配置

Ignite的原子化可以通过IgniteConfigurationatomicConfiguration属性进行配置,有如下的配置参数可选:

setter方法 描述 默认值
setBackups(int) 设置备份的数量 0
setCacheMode(CacheMode) 为所有的原子类型设置缓存模式 分区模式
setCacheMode(CacheMode) 设置为IgniteAtomicSequence接口预留的序列值的数量 1000

示例 XML:

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    ...
    <property name="atomicConfiguration">
        <bean class="org.apache.ignite.configuration.AtomicConfiguration">
            <!-- Set number of backups. -->
            <property name="backups" value="1"/>

            <!-- Set number of sequence values to be reserved. -->
            <property name="atomicSequenceReserveSize" value="5000"/>
        </bean>
    </property>
</bean>

Java:

AtomicConfiguration atomicCfg = new AtomicConfiguration();

// Set number of backups.
atomicCfg.setBackups(1);

// Set number of sequence values to be reserved. 
atomicCfg.setAtomicSequenceReserveSize(5000);

IgniteConfiguration cfg = new IgniteConfiguration();

// Use atomic configuration in Ignite configuration.
cfg.setAtomicConfiguration(atomicCfg);

// Start Ignite node.
Ignition.start(cfg);

6.3.CountDownLatch

如果熟悉关于单一JVM内多线程间同步的java.util.concurrent.CountDownLatch,Ignite也提供了支持跨集群节点类似行为的IgniteCountDownLatch。 Ignite中的CountDownLatch可以用如下方式创建:

Ignite ignite = Ignition.ignite();

IgniteCountDownLatch latch = ignite.countDownLatch(
    "latchName", // Latch name.
    10,          // Initial count.
    false        // Auto remove, when counter has reached zero.
    true         // Create if it does not exist.
);

上述代码执行之后,指定缓存的所有节点将能够同步以latchName为名的锁。下面就是这个同步机制的示例:

Ignite ignite = Ignition.ignite();

final IgniteCountDownLatch latch = ignite.countDownLatch("latchName", 10, false, true);

// Execute jobs.
for (int i = 0; i < 10; i++)
    // Execute a job on some remote cluster node.
    ignite.compute().run(() -> {
        int newCnt = latch.countDown();

        System.out.println("Counted down: newCnt=" + newCnt);
    });

// Wait for all jobs to complete.
latch.await();

6.4.ID生成器

IgniteCacheAtomicSequence接口提供的分布式原子性序列类似于分布式原子性的long类型,但是它的值只能增长。他也支持预留一定范围的序列值,来避免每次一个序列必须提供下一个值时导致的昂贵的网络消耗以及缓存更新。也就是,当在一个原子性序列上执行了incrementAndGet()(或者任何其他的原子性操作),数据结构会往前预留一定范围的序列值,他会保证对于这个序列实例来说跨集群的唯一性。 下面的例子显示了如何创建原子性序列:

Ignite ignite = Ignition.ignite();

IgniteAtomicSequence seq = ignite.atomicSequence(
    "seqName", // Sequence name.
    0,       // Initial value for sequence.
    true     // Create if it does not exist.
);

下面是一个简单的使用样例:

Ignite ignite = Ignition.ignite();

// Initialize atomic sequence.
final IgniteAtomicSequence seq = ignite.atomicSequence("seqName", 0, true);

// Increment atomic sequence.
for (int i = 0; i < 20; i++) {
  long currentValue = seq.get();
  long newValue = seq.incrementAndGet();

  ...
}

6.4.1.序列预留大小

IgniteAtomicSequence的关键参数是atomicSequenceReserveSize,他是每个节点序列值的预留数量。当一个节点试图获得IgniteAtomicSequence的实例时,一定数量的序列值会为该节点预留,然后随之而来的序列增量会在本地发生而不需要与其他节点通信,直到下一个预留操作发生。 atomicSequenceReserveSize的默认值是1000,这个默认值可以通过AtomicConfigurationatomicSequenceReserveSize属性进行修改。

要了解各种原子性配置参数的详细信息,可以参照6.2.原子性类型章节,以及如何配置他们的样例。