标签:Apache-Ignite-1.4.0-中文开发手册
[TOC]
11.分布式事件
11.1.本地和远程事件
Ignite分布式事件功能使得在分布式集群环境下发生各种各样事件时应用可以接收到通知。可以自动获得比如任务执行、发生在本地或者远程节点上的读写或者查询操作的通知。
分布式事件功能是通过IgniteEvents
接口提供的,可以通过如下方式从Ignite中获得IgniteEvents
的实例:
Ignite ignite = Ignition.ignite();
IgniteEvents evts = ignite.events();
11.1.1.订阅事件
listen方法可以用于接收集群内发生的指定事件的通知,这些方法在本地或者远程节点上注册了一个指定事件的监听器,当在该节点上发生该事件时,会通知该监听器。
本地事件
localListen(...)
方法只在本地节点上针对指定事件注册事件监听器。
远程事件
remoteListen(...)
方法会在集群或者集群组内的所有节点上针对指定事件注册监听器。
下面是每个方法的示例:
Java8:本地监听:
Ignite ignite = Ignition.ignite();
// Local listener that listenes to local events.
IgnitePredicate<CacheEvent> locLsnr = evt -> {
System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
", oldVal=" + evt.oldValue() + ", newVal=" + evt.newValue());
return true; // Continue listening.
};
// Subscribe to specified cache events occuring on local node.
ignite.events().localListen(locLsnr,
EventType.EVT_CACHE_OBJECT_PUT,
EventType.EVT_CACHE_OBJECT_READ,
EventType.EVT_CACHE_OBJECT_REMOVED);
// Get an instance of named cache.
final IgniteCache<Integer, String> cache = ignite.cache("cacheName");
// Generate cache events.
for (int i = 0; i < 20; i++)
cache.put(i, Integer.toString(i));
Java8:远程监听:
Ignite ignite = Ignition.ignite();
// Get an instance of named cache.
final IgniteCache<Integer, String> cache = ignite.jcache("cacheName");
// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate<CacheEvent> rmtLsnr = evt -> evt.<Integer>key() >= 10;
// Subscribe to specified cache events on all nodes that have cache running.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(null, rmtLsnr, EventType.EVT_CACHE_OBJECT_PUT,
EventType.EVT_CACHE_OBJECT_READ,
EventType.EVT_CACHE_OBJECT_REMOVED);
// Generate cache events.
for (int i = 0; i < 20; i++)
cache.put(i, Integer.toString(i));
Java7:监听:
Ignite ignite = Ignition.ignite();
// Get an instance of named cache.
final IgniteCache<Integer, String> cache = ignite.jcache("cacheName");
// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event: " + evt);
int key = evt.key();
return key >= 10;
}
};
// Subscribe to specified cache events occuring on
// all nodes that have the specified cache running.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(null, rmtLsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_READ, EVT_CACHE_OBJECT_REMOVED);
// Generate cache events.
for (int i = 0; i < 20; i++)
cache.put(i, Integer.toString(i));
在上述示例中,EVT_CACHE_OBJECT_PUT
,EVT_CACHE_OBJECT_READ
,EVT_CACHE_OBJECT_REMOVED
是在EventType
接口中预定义的事件类型常量。
EventType
接口定义了监听方法可用的各种事件类型常量,可以在相关的javadoc中看到这些事件类型的完整列表。作为参数传入
localListen(...)
和remoteListen(...)
方法的事件类型还必须在IgniteConfiguration
中进行配置,可以参照下面的11.1.3.配置
章节。
11.1.2.事件的查询
系统生成的所有事件都会保持在本地节点的本地,IgniteEvents
API提供了查询这些事件的方法。
本地事件
localQuery(...)
方法通过传入的谓词过滤器在本地节点上进行事件的查询。如果满足了所有的条件,就会返回一个本地节点发生的所有事件的集合。
远程事件
remoteQuery(...)
方法通过传入的谓词过滤器在远程节点上进行事件的异步查询。这个操作是分布式的,因此可能在通信层发生故障而且通常也会比本地事件通知花费更多的时间,注意这个方法是非阻塞的,然后附带future立即返回。
11.1.3.配置
要获得集群内发生的任意任务或者缓存事件的通知,IgniteConfiguration
的includeEventTypes
属性必须启用:
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<!-- Enable cache events. -->
<property name="includeEventTypes">
<util:constant static-field="org.apache.ignite.events.EventType.EVTS_CACHE"/>
</property>
...
</bean>
Java:
IgniteConfiguration cfg = new IgniteConfiguration();
// Enable cache events.
cfg.setIncludeEventTypes(EVTS_CACHE);
// Start Ignite node.
Ignition.start(cfg);
默认的话,因为性能原因事件通知是关闭的。
因为每秒生成上千的事件,他会在系统中产生额外的负载,这会导致显著的性能下降。因此强烈建议只有在应用逻辑必需时才启用这些事件。
11.2.自动化批处理
Ignite会自动地对集群内发生的,作为缓存事件的结果生成的事件通知进行分组或者分批处理。 缓存内的每个事件都会导致一个事件通知被生成以及发送,对于缓存活动频繁的系统,获取每个事件的通知都将是网络密集的,可能导致集群内缓存操作的性能下降。 Ignite中,事件通知可以被分组然后分批地或者定时地发送,下面是一个如何实现这一点的示例:
Ignite ignite = Ignition.ignite();
// Get an instance of named cache.
final IgniteCache<Integer, String> cache = ignite.jcache("cacheName");
// Sample remote filter which only accepts events for keys
// that are greater than or equal to 10.
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event: " + evt);
int key = evt.key();
return key >= 10;
}
};
// Subscribe to cache events occuring on all nodes
// that have the specified cache running.
// Send notifications in batches of 10.
ignite.events(ignite.cluster().forCacheNodes("cacheName")).remoteListen(
10 /*batch size*/, 0 /*time intervals*/, false, null, rmtLsnr, EVTS_CACHE);
// Generate cache events.
for (int i = 0; i < 20; i++)
cache.put(i, Integer.toString(i));