标签:Apache-Ignite-1.4.0-中文开发手册
[TOC]
8.计算网格
8.1.计算网格
分布式计算是通过以并行的方式执行来获得高性能、低延迟和线性可扩展。Ignite计算网格提供了一套简单的API,使得可以在集群内的多台计算机上执行分布式计算和数据处理。分布式并行处理是基于在任何集群节点集合上进行计算和执行然后将结果返回的能力实现的。
特性一览
- 分布式闭包执行
- MapReduce和ForkJoin处理
- 集群化Executor Service
- 计算和数据的并置
- 负载平衡
- 容错
- 作业状态检查点
- 作业调度
8.1.1.IgniteCompute
IgniteCompute
接口提供了在集群节点或者一个集群组中运行很多种类型计算的方法,这些方法可用于以分布式的形式执行任务或者闭包。
只要至少有一个节点有效,所有的作业和闭包就会保证得到执行,如果一个作业的执行由于资源不足被踢出,他会提供一个故障转移的机制。如果发生故障,负载平衡器会选择下一个有效的节点来执行该作业,下面的代码显示了如何获得IgniteCompute
实例:
Ignite ignite = Ignition.ignite();
// Get compute instance over all nodes in the cluster.
IgniteCompute compute = ignite.compute();
也可以通过集群组来限制执行的范围,这时,计算只会在集群组内的节点上执行。
Ignite ignite = Ignitition.ignite();
ClusterGroup remoteGroup = ignite.cluster().forRemotes();
// Limit computations only to remote nodes (exclude local node).
IgniteCompute compute = ignite.compute(remoteGroup);
8.2.分布式闭包
Ignite计算网格可以对集群或者集群组内的任何闭包进行广播和负载平衡,包括纯Java的runnables
和callables
。
8.2.1.broadcast方法
所有的broadcast(...)
方法会将一个给定的作业广播到所有的集群节点或者集群组。
Java8广播:
final Ignite ignite = Ignition.ignite();
// Limit broadcast to remote nodes only.
IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());
// Print out hello message on remote nodes in the cluster group.
compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));
Java8异步广播:
final Ignite ignite = Ignition.ignite();
// Limit broadcast to remote nodes only and
// enable asynchronous mode.
IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes()).withAsync();
// Print out hello message on remote nodes in the cluster group.
compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));
ComputeTaskFuture<?> fut = compute.future():
fut.listenAsync(f -> System.out.println("Finished sending broadcast job."));
Java7广播:
final Ignite ignite = Ignition.ignite();
// Limit broadcast to rmeote nodes only.
IgniteCompute compute = ignite.compute(ignite.cluster.forRemotes());
// Print out hello message on remote nodes in projection.
compute.broadcast(
new IgniteRunnable() {
@Override public void run() {
// Print ID of remote node on remote node.
System.out.println(">>> Hello Node: " + ignite.cluster().localNode().id());
}
}
);
Java7异步广播:
final Ignite ignite = Ignition.ignite();
// Limit broadcast to remote nodes only and
// enable asynchronous mode.
IgniteCompute compute = ignite.compute(ignite.cluster.forRemotes()).withAsync();
// Print out hello message on remote nodes in the cluster group.
compute.broadcast(
new IgniteRunnable() {
@Override public void run() {
// Print ID of remote node on remote node.
System.out.println(">>> Hello Node: " + ignite.cluster().localNode().id());
}
}
);
ComputeTaskFuture<?> fut = compute.future():
fut.listenAsync(new IgniteInClosure<? super ComputeTaskFuture<?>>() {
public void apply(ComputeTaskFuture<?> fut) {
System.out.println("Finished sending broadcast job to cluster.");
}
});
8.2.2.call和run方法
所有的call(...)
和run(...)
方法都可以在集群或者集群组内既可以执行单独的作业也可以执行作业的集合。
Java8:call:
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
// Iterate through all words in the sentence and create callable jobs.
for (String word : "How many characters".split(" "))
calls.add(word::length);
// Execute collection of callables on the cluster.
Collection<Integer> res = ignite.compute().call(calls);
// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum();
Java8:run:
IgniteCompute compute = ignite.compute();
// Iterate through all words and print
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" "))
// Run on some cluster node.
compute.run(() -> System.out.println(word));
Java8:异步call:
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
// Iterate through all words in the sentence and create callable jobs.
for (String word : "Count characters using callable".split(" "))
calls.add(word::length);
// Enable asynchronous mode.
IgniteCompute asyncCompute = ignite.compute().withAsync();
// Asynchronously execute collection of callables on the cluster.
asyncCompute.call(calls);
asyncCompute.future().listenAsync(fut -> {
// Total number of characters.
int total = fut.get().stream().mapToInt(Integer::intValue).sum();
System.out.println("Total number of characters: " + total);
});
Java8:异步run:
IgniteCompute asyncCompute = ignite.compute().withAsync();
Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();
// Iterate through all words and print
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" ")) {
// Asynchronously run on some cluster node.
asyncCompute.run(() -> System.out.println(word));
futs.add(asyncCompute.future());
}
// Wait for completion of all futures.
futs.stream().forEach(ComputeTaskFuture::get);
Java7:call:
Collection<IgniteCallable<Integer>> calls = new ArrayList<>();
// Iterate through all words in the sentence and create callable jobs.
for (final String word : "Count characters using callable".split(" ")) {
calls.add(new IgniteCallable<Integer>() {
@Override public Integer call() throws Exception {
return word.length(); // Return word length.
}
});
}
// Execute collection of callables on the cluster.
Collection<Integer> res = ignite.compute().call(calls);
int total = 0;
// Total number of characters.
// Looks much better in Java 8.
for (Integer i : res)
total += i;
Java7:异步run:
IgniteCompute asyncCompute = ignite.compute().withAsync();
Collection<ComputeTaskFuture<?>> futs = new ArrayList<>();
// Iterate through all words and print
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" ")) {
// Asynchronously run on some cluster node.
asyncCompute.run(new IgniteRunnable() {
@Override public void run() {
System.out.println(word);
}
});
futs.add(asyncCompute.future());
}
// Wait for completion of all futures.
for (ComputeTaskFuture<?> f : futs)
f.get();
8.2.3.apply方法
闭包是一个代码块,他是把代码体包装起来然后以一个函数对象的形式在内部使用外部的变量,然后可以在任何地方传递这样一个函数对象,传入一个变量然后执行。所有的apply方法都可以在集群内执行闭包。 Java8:apply:
IgniteCompute compute = ignite.compute();
// Execute closure on all cluster nodes.
Collection<Integer> res = compute.apply(
String::length,
Arrays.asList("How many characters".split(" "))
);
// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum();
Java8:异步apply:
// Enable asynchronous mode.
IgniteCompute asyncCompute = ignite.compute().withAsync();
// Execute closure on all cluster nodes.
// If the number of closures is less than the number of
// parameters, then Ignite will create as many closures
// as there are parameters.
Collection<Integer> res = asyncCompute.apply(
String::length,
Arrays.asList("How many characters".split(" "))
);
asyncCompute.future().listenAsync(fut -> {
// Total number of characters.
int total = fut.get().stream().mapToInt(Integer::intValue).sum();
System.out.println("Total number of characters: " + total);
});
Java7:apply:
// Execute closure on all cluster nodes.
Collection<Integer> res = ignite.compute().apply(
new IgniteClosure<String, Integer>() {
@Override public Integer apply(String word) {
// Return number of letters in the word.
return word.length();
}
},
Arrays.asList("Count characters using closure".split(" "))
);
int sum = 0;
// Add up individual word lengths received from remote nodes
for (int len : res)
sum += len;
8.3.Executor Service
IgniteCompute提供了一个方便的API以在集群内执行计算。虽然也可以直接使用JDK提供的标准ExecutorService
接口,但是Ignite还提供了一个ExecutorService
接口的分布式实现然后可以在集群内自动以负载平衡的模式执行所有计算。该计算具有容错性以及保证只要有一个节点处于活动状态就能保证计算得到执行,可以将其视为一个分布式的集群化线程池。
// Get cluster-enabled executor service.
ExecutorService exec = ignite.executorService();
// Iterate through all words in the sentence and create jobs.
for (final String word : "Print words using runnable".split(" ")) {
// Execute runnable on some node.
exec.submit(new IgniteRunnable() {
@Override public void run() {
System.out.println(">>> Printing '" + word + "' on this node from grid job.");
}
});
也可以限制作业在一个集群组中执行:
// Cluster group for nodes where the attribute 'worker' is defined.
ClusterGroup workerGrp = ignite.cluster().forAttribute("ROLE", "worker");
// Get cluster-enabled executor service for the above cluster group.
ExecutorService exec = ignite.executorService(workerGrp);
8.4.MapReduce和ForkJoin
ComputeTask
是Ignite对于简化内存内MapReduce的抽象,这个也非常接近于ForkJoin范式,纯粹的MapReduce从来不是为了性能而设计,只适用于处理离线的批量业务处理(比如Hadoop MapReduce)。然而,当对内存内的数据进行计算时,实时性低延迟和高吞吐量通常具有更高的优先级。同样,简化API也变得非常重要。考虑到这一点,Ignite推出了ComputeTask
API,它是一个轻量级的MapReduce(或ForkJoin)实现。
只有当需要对作业到节点的映射做细粒度控制或者对故障转移进行定制的时候,才使用
ComputeTask
。对于所有其他的场景,都需要使用8.2.分布式闭包
中介绍的集群内闭包执行来实现。
8.4.1.ComputeTask
ComputeTask
定义了要在集群内执行的作业以及这些作业到节点的映射,他还定义了如何处理作业的返回值(Reduce)。所有的IgniteCompute.execute(...)
方法都会在集群上执行给定的任务,应用只需要实现ComputeTask
接口的map(...)
和reduce(...)
方法即可。
任务是通过实现ComputeTask
接口的2或者3个方法定义的。
map方法
map(...)
方法将作业实例化然后将他们映射到工作节点,这个方法收到任务要运行的集群节点的集合还有任务的参数,该方法会返回一个map,作业为键,映射的工作节点为值。然后作业会被发送到工作节点上并且在那里执行。
关于
map(...)
方法的简化实现,可以参照下面的ComputeTaskSplitAdapter
。
result方法
result(...)
方法在每次作业在集群节点上执行时都会被调用,它接收计算作业返回的结果,以及迄今为止收到的作业结果的列表。该方法会返回一个ComputeJobResultPolicy
的实例,说明下一步要做什么。
WAIT
:等待所有剩余的作业完成(如果有的话)REDUCE
:立即进入Reduce阶段,丢弃剩余的作业和还未收到的结果FAILOVER
:将作业转移到另一个节点(参照8.7.容错
章节),所有已经收到的作业结果也会在reduce(...)
方法中有效
reduce方法
当所有作业完成后(或者从result(...)
方法返回REDUCE结果策略),reduce(...)
方法在Reduce阶段被调用。该方法接收到所有计算结果的一个列表然后返回一个最终的计算结果。
8.4.2.计算任务适配器
定义计算时每次都实现ComputeTask
的所有三个方法并不是必须的,有一些帮助类使得只需要描述一个特定的逻辑片段即可,剩下的交给Ignite自动处理。
ComputeTaskAdapter
ComputeTaskAdapter
定义了一个默认的result(...)
方法实现,他在当一个作业抛出异常时返回一个FAILOVER
策略,否则会返回一个WAIT
策略,这样会等待所有的作业完成,并且有结果。
ComputeTaskSplitAdapter
ComputeTaskSplitAdapter
继承了ComputeTaskAdapter
,他增加了将作业自动分配给节点的功能。它隐藏了map(...)
方法然后增加了一个新的split(...)
方法,使得开发者只需要提供一个待执行的作业集合(这些作业到节点的映射会被适配器以负载平衡的方式自动处理)。
这个适配器对于所有节点都适于执行作业的同质化环境是非常有用的,这样的话映射阶段就可以隐式地完成。
8.4.3.ComputeJob
任务触发的所有作业都实现了ComputeJob
接口,这个接口的execute()
方法定义了作业的逻辑然后应该返回一个作业的结果。cancel()
方法定义了当一个作业被丢弃时的逻辑(比如,当任务决定立即进入Reduce阶段或者被取消)。
ComputeJobAdapter
这是一个提供了无操作的cancel()
方法的方便的适配器类。
8.4.4.示例
下面是一个ComputeTask
和ComputeJob
的示例:
ComputeTaskSplitAdapter:
IgniteCompute compute = ignite.compute();
// Execute task on the clustr and wait for its completion.
int cnt = grid.compute().execute(CharacterCountTask.class, "Hello Grid Enabled World!");
System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'.");
/**
* Task to count non-white-space characters in a phrase.
*/
private static class CharacterCountTask extends ComputeTaskSplitAdapter<String, Integer> {
// 1. Splits the received string into to words
// 2. Creates a child job for each word
// 3. Sends created jobs to other nodes for processing.
@Override
public List<ClusterNode> split(List<ClusterNode> subgrid, String arg) {
String[] words = arg.split(" ");
List<ComputeJob> jobs = new ArrayList<>(words.length);
for (final String word : arg.split(" ")) {
jobs.add(new ComputeJobAdapter() {
@Override public Object execute() {
System.out.println(">>> Printing '" + word + "' on from compute job.");
// Return number of letters in the word.
return word.length();
}
});
}
return jobs;
}
@Override
public Integer reduce(List<ComputeJobResult> results) {
int sum = 0;
for (ComputeJobResult res : results)
sum += res.<Integer>getData();
return sum;
}
}
ComputeTaskAdapter:
IgniteCompute compute = ignite.compute();
// Execute task on the clustr and wait for its completion.
int cnt = grid.compute().execute(CharacterCountTask.class, "Hello Grid Enabled World!");
System.out.println(">>> Total number of characters in the phrase is '" + cnt + "'.");
/**
* Task to count non-white-space characters in a phrase.
*/
private static class CharacterCountTask extends ComputeTaskAdapter<String, Integer> {
// 1. Splits the received string into to words
// 2. Creates a child job for each word
// 3. Sends created jobs to other nodes for processing.
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, String arg) {
String[] words = arg.split(" ");
Map<ComputeJob, ClusterNode> map = new HashMap<>(words.length);
Iterator<ClusterNode> it = subgrid.iterator();
for (final String word : arg.split(" ")) {
// If we used all nodes, restart the iterator.
if (!it.hasNext())
it = subgrid.iterator();
ClusterNode node = it.next();
map.put(new ComputeJobAdapter() {
@Override public Object execute() {
System.out.println(">>> Printing '" + word + "' on this node from grid job.");
// Return number of letters in the word.
return word.length();
}
}, node);
}
return map;
}
@Override
public Integer reduce(List<ComputeJobResult> results) {
int sum = 0;
for (ComputeJobResult res : results)
sum += res.<Integer>getData();
return sum;
}
}
8.4.5.分布式任务会话
每个任务执行时都会创建分布式任务会话,他是由ComputeTaskSession
接口定义的。任务会话对于任务和其产生的所有作业都是可见的,因此一个作业或者一个任务设置的属性也可以被其他的作业访问。任务会话也可以在属性设置或者等待属性设置时接收通知。
在任务及其相关的所有作业之间会话属性设置的顺序是一致的,不会出现一个作业发现属性A在属性B之前,而另一个作业发现属性B在属性A之前的情况。
在下面的例子中,让所有的作业在步骤1移动到步骤2之前是同步的:
@ComputeTaskSessionFullSupport注解 注意由于性能的原因分布式任务会话默认是禁用的,要启用的话需要在任务类上加注
@ComputeTaskSessionFullSupport
注解。
IgniteCompute compute = ignite.commpute();
compute.execute(new TaskSessionAttributesTask(), null);
/**
* Task demonstrating distributed task session attributes.
* Note that task session attributes are enabled only if
* @ComputeTaskSessionFullSupport annotation is attached.
*/
@ComputeTaskSessionFullSupport
private static class TaskSessionAttributesTask extends ComputeTaskSplitAdapter<Object, Object>() {
@Override
protected Collection<? extends GridJob> split(int gridSize, Object arg) {
Collection<ComputeJob> jobs = new LinkedList<>();
// Generate jobs by number of nodes in the grid.
for (int i = 0; i < gridSize; i++) {
jobs.add(new ComputeJobAdapter(arg) {
// Auto-injected task session.
@TaskSessionResource
private ComputeTaskSession ses;
// Auto-injected job context.
@JobContextResource
private ComputeJobContext jobCtx;
@Override
public Object execute() {
// Perform STEP1.
...
// Tell other jobs that STEP1 is complete.
ses.setAttribute(jobCtx.getJobId(), "STEP1");
// Wait for other jobs to complete STEP1.
for (ComputeJobSibling sibling : ses.getJobSiblings())
ses.waitForAttribute(sibling.getJobId(), "STEP1", 0);
// Move on to STEP2.
...
}
}
}
}
@Override
public Object reduce(List<ComputeJobResult> results) {
// No-op.
return null;
}
}
8.5.节点局部状态共享
通常来说在不同的计算作业或者不同的部署服务之间共享状态是很有用的,为此Ignite在每个节点上提供了一个共享并发node-local-map。
IgniteCluster cluster = ignite.cluster();
ConcurrentMap<String, Integer> nodeLocalMap = cluster.nodeLocalMap():
节点局部变量类似于线程局部变量,只不过他不是分布式的,他只会保持在本地节点上。节点局部变量可以用于计算任务在不同的执行中共享状态,他也可以用于部署的服务。 作为一个例子,创建一个作业,每次当他在某个节点上执行时都会使节点局部的计数器增加,这样,每个节点的节点局部计数器都会告诉我们一个作业在那个节点上执行了多少次。
private IgniteCallable<Long> job = new IgniteCallable<Long>() {
@IgniteInstanceResource
private Ignite ignite;
@Override
public Long call() {
// Get a reference to node local.
ConcurrentMap<String, AtomicLong> nodeLocalMap = ignite.cluster().nodeLocalMap();
AtomicLong cntr = nodeLocalMap.get("counter");
if (cntr == null) {
AtomicLong old = nodeLocalMap.putIfAbsent("counter", cntr = new AtomicLong());
if (old != null)
cntr = old;
}
return cntr.incrementAndGet();
}
}
现在在同一个节点上执行这个作业2次然后确保计数器的值为2:
ClusterGroup random = ignite.cluster().forRandom();
IgniteCompute compute = ignite.compute(random);
// The first time the counter on the picked node will be initialized to 1.
Long res = compute.call(job);
assert res == 1;
// Now the counter will be incremented and will have value 2.
res = compute.call(job);
assert res == 2;
8.6.计算和数据的并置
计算和数据的并置可以最小化网络中的数据序列化,以及可以显著地提升应用的性能和可扩展性。不管何时,都应该尽力地使计算和缓存着要处理的数据的集群节点并置。
8.6.1.基于关系的call方法和run方法
affinityCall(...)
和affinityRun(...)
方法使作业和缓存着数据的节点位于一处,换句话说,给定缓存名字和关系键,这些方法会试图在指定的缓存中定位键所在的节点,然后在那里执行作业。
Java8:affinityRun:
IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);
IgniteCompute compute = ignite.compute();
for (int key = 0; key < KEY_CNT; key++) {
// This closure will execute on the remote node where
// data with the 'key' is located.
compute.affinityRun(CACHE_NAME, key, () -> {
// Peek is a local memory lookup.
System.out.println("Co-located [key= " + key + ", value= " + cache.localPeek(key) +']');
});
}
Java8:异步affinityRun:
IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);
IgniteCompute asyncCompute = ignite.compute().withAsync();
List<IgniteFuture<?>> futs = new ArrayList<>();
for (int key = 0; key < KEY_CNT; key++) {
// This closure will execute on the remote node where
// data with the 'key' is located.
asyncCompute.affinityRun(CACHE_NAME, key, () -> {
// Peek is a local memory lookup.
System.out.println("Co-located [key= " + key + ", value= " + cache.peek(key) +']');
});
futs.add(asyncCompute.future());
}
// Wait for all futures to complete.
futs.stream().forEach(IgniteFuture::get);
Java7:affinityRun:
final IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);
IgniteCompute compute = ignite.compute();
for (int i = 0; i < KEY_CNT; i++) {
final int key = i;
// This closure will execute on the remote node where
// data with the 'key' is located.
compute.affinityRun(CACHE_NAME, key, new IgniteRunnable() {
@Override public void run() {
// Peek is a local memory lookup.
System.out.println("Co-located [key= " + key + ", value= " + cache.peek(key) +']');
}
});
}
8.7.容错
Ignite支持作业的自动故障转移,当一个节点崩溃时,作业会被转移到其他可用节点再次执行。然而在Ignite中也可以将任何作业的结果认为是失败的。工作的节点可以仍然是存活的,但是他运行在一个很低的CPU,I/O,磁盘空间等资源上,在很多情况下会导致应用的故障然后触发一个故障的转移。此外,也有选择一个作业故障转移到那个节点的功能,因为同一个应用内部不同的程序或者不同的计算也会是不同的。
FailoverSpi
负责选择一个新的节点来执行失败作业。FailoverSpi
检查发生故障的作业以及该作业可以尝试执行的所有可用的网格节点的列表。他会确保该作业不会再次映射到出现故障的同一个节点。故障转移是在ComputeTask.result(...)
方法返回ComputeJobResultPolicy.FAILOVER
策略时触发的。Ignite内置了一些可定制的故障转移SPI实现。
8.7.1.至少一次保证
只要有一个节点是有效的,作业就不会丢失。
默认的话,Ignite会自动对停止或者故障的节点上的所有作业进行故障转移,如果要定制故障转移的行为,需要实现ComputeTask.result()
方法。下面的例子显示了当一个作业抛出任何的IgniteException
(或者它的子类)时会触发故障转移。
public class MyComputeTask extends ComputeTaskSplitAdapter<String, String> {
...
@Override
public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
IgniteException err = res.getException();
if (err != null)
return ComputeJobResultPolicy.FAILOVER;
// If there is no exception, wait for all job results.
return ComputeJobResultPolicy.WAIT;
}
...
}
8.7.2.闭包故障转移
闭包的故障转移是被ComputeTaskAdapter
管理的,它在一个远程节点或者故障或者拒绝执行闭包时被触发。这个默认的行为可以被IgniteCompute.withNoFailover()
方法覆盖,他会创建一个设置了无故障转移标志的IgniteCompute
实例,下面是一个例子:
IgniteCompute compute = ignite.compute().withNoFailover();
compute.apply(() -> {
// Do something
...
}, "Some argument");
8.7.3.AlwaysFailOverSpi
AlwaysFailoverSpi
总是将一个故障的作业路由到另一个节点。注意,首先会尝试将故障的作业路由到该任务还没有被执行过的节点上,如果没有可用的节点,然后会试图将故障的作业路由到可能运行同一个任务中其他的作业的节点上,如果上述的尝试都失败了,那么该作业就不会被故障转移然后会返回一个null。
下面的配置参数可以用于配置AlwaysFailoverSpi
:
setter方法 | 描述 | 默认值 |
---|---|---|
setMaximumFailoverAttempts(int) |
设置尝试将故障作业转移到其他节点的最大次数 | 5 |
XML:
<bean id="grid.custom.cfg" class="org.apache.ignite.IgniteConfiguration" singleton="true">
...
<bean class="org.apache.ignite.spi.failover.always.AlwaysFailoverSpi">
<property name="maximumFailoverAttempts" value="5"/>
</bean>
...
</bean>
Java:
AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
IgniteConfiguration cfg = new IgniteConfiguration();
// Override maximum failover attempts.
failSpi.setMaximumFailoverAttempts(5);
// Override the default failover SPI.
cfg.setFailoverSpi(failSpi);
// Start Ignite node.
Ignition.start(cfg);
8.8.负载平衡
负载平衡组件将作业在集群节点之间平衡分配。Ignite中负载平衡是通过LoadBalancingSpi
获得的。它控制所有节点的负载以及确保集群中的每个节点负载水平均衡。对于同质化环境中的同质化的任务,负载平衡采用的是随机或者循环的策略。然而在很多其他场景中,特别是在一些不均匀的负载下,就需要更复杂的自适应负载平衡策略。
数据并置 注意,当作业还没有与数据并置或者还没有在哪个节点上执行的倾向时,负载平衡就已经触发了。如果使用了数据和计算的并置,那么数据的并置优先于负载平衡。
8.8.1.循环式负载平衡
RoundRobinLoadBalancingSpi
以循环的方式在节点间迭代,然后选择下一个连续的节点。支持两种操作模式:每任务以及全局。
每任务模式 如果配置成每任务模式,当任务开始执行时实现会随机地选择一个节点,然后会顺序地迭代网络中所有的节点,对于拆分的大小等同于节点的数量时这是默认的配置,这个模式保证所有的节点都会参与拆分。
全局模式 如果配置成全局模式,对于所有的任务都会维护一个节点的单一连续队列然后每次都会从队列中选择一个节点。这个模式中(不像每任务模式),当多个任务并发执行时,即使拆分大小等同于节点的数量,同一个任务的某些作业仍然可能被赋予同一个节点。 XML:
<bean id="grid.custom.cfg" class="org.apache.ignite.IgniteConfiguration" singleton="true">
...
<property name="loadBalancingSpi">
<bean class="org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi">
<!-- Set to per-task round-robin mode (this is default behavior). -->
<property name="perTask" value="true"/>
</bean>
</property>
...
</bean>
Java:
RoundRobinLoadBalancingSpi = new RoundRobinLoadBalancingSpi();
// Configure SPI to use per-task mode (this is default behavior).
spi.setPerTask(true);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default load balancing SPI.
cfg.setLoadBalancingSpi(spi);
// Start Ignite node.
Ignition.start(cfg);
8.8.2.随机或者加权负载平衡
WeightedRandomLoadBalancingSpi
默认会为作业选择一个随机的节点。也可以选择为节点赋予权值,这样的话有更高权重的节点最终会使将作业分配给他的机会更多。默认的话所有节点的权重都是10。
XML:
<bean id="grid.custom.cfg" class="org.apache.ignite.IgniteConfiguration" singleton="true">
...
<property name="loadBalancingSpi">
<bean class="org.apache.ignite.spi.loadbalancing.weightedrandom.WeightedRandomLoadBalancingSpi">
<property name="useWeights" value="true"/>
<property name="nodeWeight" value="10"/>
</bean>
</property>
...
</bean>
Java:
WeightedRandomLoadBalancingSpi = new WeightedRandomLoadBalancingSpi();
// Configure SPI to used weighted random load balancing.
spi.setUseWeights(true);
// Set weight for the local node.
spi.setWeight(10);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default load balancing SPI.
cfg.setLoadBalancingSpi(spi);
// Start Ignite node.
Ignition.start(cfg);
8.9.检查点
检查点提供了保存一个作业中间状态的能力,他有助于一个长期运行的作业保存一些中间状态以防节点故障。重启一个故障节点后,一个作业会从保存的检查点载入然后从故障处继续执行。对于作业检查点状态,唯一必要的就是实现java.io.Serializable
接口。
检查点功能可以通过GridTaskSession
接口的如下方法启用:
ComputeTaskSession.loadCheckpoint(String)
ComputeTaskSession.removeCheckpoint(String)
ComputeTaskSession.saveCheckpoint(String, Object)
@ComputeTaskSessionFullSupport注解 注意检查点因为性能的原因默认是禁用的,要启用它需要在任务或者闭包类上加注
@ComputeTaskSessionFullSupport
注解。
8.9.1.主节点故障保护
检查点的一个重要使用场景是避免“主”节点(开启了原来的执行的节点)的故障是不容易的。当主节点故障时,Ignite不知道将作业的执行结果发送给谁,这样的话结果就会被丢弃。 这种情况下要恢复,可以先将作业的最终执行结果保存为一个检查点然后在”主”节点故障时有一个逻辑来重新运行整个任务。这时任务的重新运行会非常快因为所有的作业都可以从已保存的检查点启动。
8.9.2.设置检查点
每个计算任务都可以通过调用ComputeTaskSession.saveCheckpoint(...)
方法定期地保存检查点。
如果作业确实保存了检查点,那么当它开始执行的时候,应该检查检查点是否可用然后从最后保存的检查点处开始执行。
IgniteCompute compute = ignite.compute();
compute.run(new CheckpointsRunnable());
/**
* Note that this class is annotated with @ComputeTaskSessionFullSupport
* annotation to enable checkpointing.
*/
@ComputeTaskSessionFullSupport
private static class CheckpointsRunnable implements IgniteRunnable() {
// Task session (injected on closure instantiation).
@TaskSessionResource
private ComputeTaskSession ses;
@Override
public Object applyx(Object arg) throws GridException {
// Try to retrieve step1 result.
Object res1 = ses.loadCheckpoint("STEP1");
if (res1 == null) {
res1 = computeStep1(arg); // Do some computation.
// Save step1 result.
ses.saveCheckpoint("STEP1", res1);
}
// Try to retrieve step2 result.
Object res2 = ses.loadCheckpoint("STEP2");
if (res2 == null) {
res2 = computeStep2(res1); // Do some computation.
// Save step2 result.
ses.saveCheckpoint("STEP2", res2);
}
...
}
}
8.9.3.CheckpointSpi
Ignite中,检查点功能是通过CheckpointSpi
提供的,他有如下开箱即用的实现:
|类|描述|
|---|---|
|SharedFsCheckpointSpi
默认|这个实现通过一个共享的文件系统来保存检查点|
|CacheCheckpointSpi
|这个实现通过缓存来保存检查点|
|JdbcCheckpointSpi
|这个实现通过数据库来保存检查点|
|S3CheckpointSpi
|这个实现通过Amazon S3来保存检查点|
CheckpointSpi
是在IgniteConfiguration
中提供的然后在启动时传递给Ignition类。
9.9.4.文件系统检查点配置
下面的配置参数可用于配置SharedFsCheckpointSpi
:
|settter方法|描述|默认值|
|---|---|---|
|setDirectoryPaths(Collection)
|设置检查点要保存的共享文件夹的目录路径。这个路径既可以是绝对的也可以是相对于IGNITE_HOME
环境变量或者系统参数指定的路径|IGNITE_HOME/work/cp/sharedfs
|
XML:
<bean class="org.apache.ignite.IgniteConfiguration" singleton="true">
...
<property name="checkpointSpi">
<bean class="org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi">
<!-- Change to shared directory path in your environment. -->
<property name="directoryPaths">
<list>
<value>/my/directory/path</value>
<value>/other/directory/path</value>
</list>
</property>
</bean>
</property>
...
</bean>
Java:
IgniteConfiguration cfg = new IgniteConfiguration();
SharedFsCheckpointSpi checkpointSpi = new SharedFsCheckpointSpi();
// List of checkpoint directories where all files are stored.
Collection<String> dirPaths = new ArrayList<String>();
dirPaths.add("/my/directory/path");
dirPaths.add("/other/directory/path");
// Override default directory path.
checkpointSpi.setDirectoryPaths(dirPaths);
// Override default checkpoint SPI.
cfg.setCheckpointSpi(checkpointSpi);
// Starts Ignite node.
Ignition.start(cfg);
8.9.5.缓存检查点配置
CacheCheckpointSpi
对于检查点SPI来说是一个基于缓存的实现,检查点数据会存储于Ignite数据网格中的一个预定义缓存中。
下面的配置参数可用于配置CacheCheckpointSpi
:
|setter方法|描述|默认值|
|---|---|---|
|setCacheName(String)
|设置用于保存检查点的缓存的名字|checkpoints
|
8.9.6.数据库检查点配置
JdbcCheckpointSpi
通过数据库来保存检查点。所有的检查点都会保存在数据库表中然后对于集群中的所有节点都是可用的。注意每个节点必须访问数据库。一个作业状态可以在一个节点保存然后在另一个节点载入(例如一个作业在节点故障后被另一个节点取代)。
下面的配置参数可用于配置JdbcCheckpointSpi
,(所有的都是可选的)
setter方法 | 描述 | 默认值 |
---|---|---|
setDataSource(DataSource) |
设置用于数据库访问的数据源 | 无 |
setCheckpointTableName(String) |
设置检查点表名 | CHECKPOINTS |
setKeyFieldName(String) |
设置检查点键字段名 | NAME |
setKeyFieldType(String) |
设置检查点键字段类型,字段应该有相应的SQL字符串类型(比如VARCHAR ) |
VARCHAR(256) |
setValueFieldName(String) |
设置检查点值字段名 | VALUE |
setValueFieldType(String) |
设置检查点值字段类型,注意,字段需要有相应的SQL BLOB类型,默认值是BLOB,但不是所有数据库都兼容,比如,如果使用HSQL DB,那么类型应该为longvarbinary |
BLOB |
setExpireDateFieldName(String) |
设置检查点过期时间字段名 | EXPIRE_DATE |
setExpireDateFieldType(String) |
设置检查点过期时间字段类型,字段应该有相应的DATETIME 类型 |
DATETIME |
setNumberOfRetries(int) |
任何数据库错误时的重试次数 | 2 |
setUser(String) |
设置检查点数据库用户名,注意只有同时设置了用户名和密码时,认证才会执行 | 无 |
setPassword(String) |
设置检查点数据库密码 | 无 |
Apache DBCP Apache DBCP项目对于数据源和连接池提供了各种封装,可以通过Spring配置文件或者代码以spring bean的形式使用这些封装类来配置这个SPI,可以参照Apache DBCP来获得更多的信息。 XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
...
<property name="checkpointSpi">
<bean class="org.apache.ignite.spi.checkpoint.database.JdbcCheckpointSpi">
<property name="dataSource">
<ref bean="anyPoolledDataSourceBean"/>
</property>
<property name="checkpointTableName" value="CHECKPOINTS"/>
<property name="user" value="test"/>
<property name="password" value="test"/>
</bean>
</property>
...
</bean>
Java:
JdbcCheckpointSpi checkpointSpi = new JdbcCheckpointSpi();
javax.sql.DataSource ds = ... // Set datasource.
// Set database checkpoint SPI parameters.
checkpointSpi.setDataSource(ds);
checkpointSpi.setUser("test");
checkpointSpi.setPassword("test");
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default checkpoint SPI.
cfg.setCheckpointSpi(checkpointSpi);
// Start Ignite node.
Ignition.start(cfg);
8.9.7.Amazon S3 检查点配置
S3CheckpointSpi
使用S3存储来保存检查点。要了解有关Amazon S3的信息可以参考http://aws.amazon.com/。
下面的参数可用于配置S3CheckpointSpi
:
setter方法 | 描述 | 默认值 |
---|---|---|
setAwsCredentials(AWSCredentials) |
设置要使用的AWS凭证来保存检查点 | 无,但必须提供 |
setClientConfiguration(Client) |
设置AWS客户端配置 | 无 |
setBucketNameSuffix(String) |
设置bucket名字后缀 | default-bucket |
XML:
<bean class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
...
<property name="checkpointSpi">
<bean class="org.apache.ignite.spi.checkpoint.s3.S3CheckpointSpi">
<property name="awsCredentials">
<bean class="com.amazonaws.auth.BasicAWSCredentials">
<constructor-arg value="YOUR_ACCESS_KEY_ID" />
<constructor-arg value="YOUR_SECRET_ACCESS_KEY" />
</bean>
</property>
</bean>
</property>
...
</bean>
Java:
IgniteConfiguration cfg = new IgniteConfiguration();
S3CheckpointSpi spi = new S3CheckpointSpi();
AWSCredentials cred = new BasicAWSCredentials(YOUR_ACCESS_KEY_ID, YOUR_SECRET_ACCESS_KEY);
spi.setAwsCredentials(cred);
spi.setBucketNameSuffix("checkpoints");
// Override default checkpoint SPI.
cfg.setCheckpointSpi(cpSpi);
// Start Ignite node.
Ignition.start(cfg);
8.10.作业调度
Ignite中,作业是在客户端侧的任务拆分初始化或者闭包执行阶段被映射到集群节点上的。然而,一旦作业到达被分配的节点,他们需要有序地执行。默认情况下,作业被提交到一个线程池然后随机地执行,如果要对作业执行顺序进行细粒度控制的话,需要启用CollisionSpi
。
8.10.1.FIFO排序
FifoQueueCollisionSpi
可以使一定数量的作业无中断地以先入先出的顺序执行,所有其他的作业都会被放入一个等待列表,直到轮到他。
并行作业的数量是由parallelJobsNumber
配置参数控制的,默认值为2.
一次一个
注意如果将parallelJobsNumber
设置为1,可以保证所有作业同时只会执行一个,这样的话没有任何两个作业并发执行。
XML:
<bean class="org.apache.ignite.IgniteConfiguration" singleton="true">
...
<property name="collisionSpi">
<bean class="org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi">
<!-- Execute one job at a time. -->
<property name="parallelJobsNumber" value="1"/>
</bean>
</property>
...
</bean>
Java:
FifoQueueCollisionSpi colSpi = new FifoQueueCollisionSpi();
// Execute jobs sequentially, one at a time,
// by setting parallel job number to 1.
colSpi.setParallelJobsNumber(1);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default collision SPI.
cfg.setCollisionSpi(colSpi);
// Start Ignite node.
Ignition.start(cfg);
8.10.2.优先级排序
PriorityQueueCollisionSpi
可以为每个作业设置一个优先级,因此高优先级的作业会比低优先级的作业先执行。
8.10.3.任务优先级
任务优先级是通过任务会话中的grid.task.priority
属性设置的,如果任务没有被赋予优先级,那么会使用默认值0。
下面是一个如何设置任务优先级的示例:
public class MyUrgentTask extends ComputeTaskSplitAdapter<Object, Object> {
// Auto-injected task session.
@TaskSessionResource
private GridTaskSession taskSes = null;
@Override
protected Collection<ComputeJob> split(int gridSize, Object arg) {
...
// Set high task priority.
taskSes.setAttribute("grid.task.priority", 10);
List<ComputeJob> jobs = new ArrayList<>(gridSize);
for (int i = 1; i <= gridSize; i++) {
jobs.add(new GridJobAdapter() {
...
});
}
...
// These jobs will be executed with higher priority.
return jobs;
}
}
和FIFO排序一样,并行执行作业的数量是由parallelJobsNumber
配置参数控制的。
配置
XML:
<bean class="org.apache.ignite.IgniteConfiguration" singleton="true">
...
<property name="collisionSpi">
<bean class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi">
<!--
Change the parallel job number if needed.
Default is number of cores times 2.
-->
<property name="parallelJobsNumber" value="5"/>
</bean>
</property>
...
</bean>
Java:
PriorityQueueCollisionSpi colSpi = new PriorityQueueCollisionSpi();
// Change the parallel job number if needed.
// Default is number of cores times 2.
colSpi.setParallelJobsNumber(5);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default collision SPI.
cfg.setCollisionSpi(colSpi);
// Start Ignite node.
Ignition.start(cfg);
8.11.任务部署
除了对等类加载之外,Ignite还有一个部署机制,他负责在运行时从不同的源中部署任务和类。
8.11.1.DeploymentSpi
部署的功能是通过DeploymentSpi
接口提供的。
类加载器负责加载任务类(或者其他的类),他可以通过调用register(ClassLoader, Class)
方法或者SPI自己来直接部署。比如通过异步地扫描一些文件夹来加载新的任务。当系统调用了findResource(String)
方法时,SPI必须返回一个与给定的类相对应的类加载器。每次一个类加载器获得(再次)部署或者释放,SPI必须调用DeploymentListener.onUnregistered(ClassLoader)
回调。
如果启用了对等类加载,因为通常只会在一个网格节点上部署类加载器,一旦一个任务在集群上开始执行,所有其他的节点都会从任务初始执行的节点自动加载所有的任务类。对等类加载也支持热部署。每次任务发生变化或者在一个节点上重新部署,所有其他的节点都会侦测到然后重新部署这个任务。注意对等类加载只在任务为非本地部署时才生效,否则本地部署总是具有优先权。
Ignite提供了如下的开箱即用的DeploymentSpi
实现:
- UriDeploymentSpi
- LocalDeploymentSpi
SPI的方法不要直接使用。SPI提供了子系统的内部视图以及由Ignite内部使用。在很少的情况下可能需要访问这个SPI的特定实现,可以通过
Ignite.configuration()
方法来获得这个SPI的一个实例,来检查他的配置属性或者调用其他的非SPI方法。再次注意从获得的实例中调用接口的方法可能导致未定义的行为而且明确不会得到支持。
8.11.2.UriDeploymentSpi
这是DeploymentSpi
的一个实现,它可以从不同的资源部署任务,比如文件系统文件夹、email和FTP。在集群中有不同的方式来部署任务以及每个部署方法都会依赖于所选的源协议。这个SPI可以配置与一系列的URI一起工作,每个URI都包括有关协议/传输加上配置参数比如凭证、扫描频率以及其他的所有数据。
当SPI建立一个与URI的连接时,为了防止扫描过程中的任何变化他会下载待部署的单元到一个临时目录,通过方法setTemporaryDirectoryPath(String)
可以为下载的部署单元设置自定义的临时文件夹。SPI会在该路径下创建一个和本地节点ID名字完全一样的文件夹。
SPI会跟踪每个给定URI的所有变化。这意味着如果任何文件发生变化或者被删除,SPI会重新部署或者删除相应的任务。注意第一个调用findResource(String)
是阻塞的,直到SPI至少一次完成了对所有URI的扫描。
下面是一些支持的可部署单元的类型:
- GAR文件
- 带有未解压GAR文件结构的本地磁盘文件夹
- 只包含已编译的Java类的本地磁盘文件夹
GAR文件 GAR文件是一个可部署的单元,GAR文件基于ZLIB压缩格式,类似简单JAR文件,他的结构类似于WAR包,GAR文件有一个“.gar”扩展名。 GAR文件结构(以.gar结尾的文件或者目录):
META-INF/
|
- ignite.xml
- ...
lib/
|
-some-lib.jar
- ...
xyz.class
...
META-INF
:包含任务描述的ignite.xml
文件的入口,任务描述XML格式文件的作用是指定所有要部署的任务。这个文件是标准的Spring格式XML定义文件,META-INF/
也可以包含其他所有的JAR格式指定的文件。lib/
:包含所有库依赖的入口。- 编译过的java类必须放在GAR文件的低一层。
没有描述文件GAR文件也可以部署。如果没有描述文件,SPI会扫描包里的所有类然后实例化其中实现ComputeTask
接口的。那样的话,所有的任务类必须有一个公开的无参数的构造函数。创建任务时为了方便可以使用ComputeTaskAdapter
适配器。
默认的话,所有下载的GAR文件在META-INF
文件夹都要有待认证的数字签名,然后只有在签名有效时才会被部署。
代码示例
下面的实例演示了可用的SPI是如何部署的,不同的协议也可以一起使用。
File协议:
// The example expects that you have a GAR file in
// `home/username/ignite/work/my_deployment/file` folder
// which contains `myproject.HelloWorldTask` class.
IgniteConfiguration cfg = new IgniteConfiguration();
DeploymentSpi deploymentSpi = new UriDeploymentSpi();
deploymentSpi.setUriList(Arrays.asList("file:///home/username/ignite/work/my_deployment/file"));
cfg.setDeploymentSpi(deploymentSpi);
try(Ignite ignite = Ignition.start(cfg)) {
ignite.compute().execute("myproject.HelloWorldTask", "my args");
}
HTTP协议:
// The example expects that you have a HTMP under
// 'www.mysite.com:110/ignite/deployment'page which contains a link
// on GAR file which contains `myproject.HelloWorldTask` class.
IgniteConfiguration cfg = new IgniteConfiguration();
DeploymentSpi deploymentSpi = new UriDeploymentSpi();
deploymentSpi.setUriList(Arrays.asList("http://username:password;[email protected]:110/ignite/deployment"));
cfg.setDeploymentSpi(deploymentSpi);
try(Ignite ignite = Ignition.start(cfg)) {
ignite.compute().execute("myproject.HelloWorldTask", "my args");
}
XML配置
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="deploymentSpi">
<bean class="org.apache.ignite.grid.spi.deployment.uri.UriDeploymentSpi">
<property name="temporaryDirectoryPath" value="c:/tmp/grid"/>
<property name="uriList">
<list>
<value>http://www.site.com/tasks</value>
<value>file://freq=20000@localhost/c:/Program files/gg-deployment</value>
</list>
</property>
</bean>
</property>
</bean>
配置
|属性|描述|可选|默认值|
|---|---|---|---|
|uriList
|SPI扫描新任务的URI列表|是|file://${IGNITE_HOME}/work/deployment/file
,注意要使用默认文件夹的话,IGNITE_HOME
必须设置。|
|scanners
|用于部署资源的UriDeploymentScanner
实现的数组|是|UriDeploymentFileScanner
和UriDeploymentHttpScanner
|
|temporaryDirectoryPath
|要扫描的GAR文件和目录要拷贝的目标临时目录路径|是|java.io.tmpdir
系统属性值|
|encodeUri
|控制URI中路径部分编码的标志|是|true
|
协议 SPI开箱即用地支持如下协议:
- file:// - File协议
- http:// - HTTP协议
- https:// - 安全HTTP协议
自定义协议 如果需要可以增加其他的协议的支持,可以通过实现
UriDeploymentScanner
接口然后通过setScanners(UriDeploymentScanner... scanners)
方法将实现加入SPI来做到这一点。
除了SPI配置参数之外,对于选择的URI所有必要的配置参数都应该在URI中定义。不同的协议有不同的配置参数,下面分别做了描述,参数是以分号分割的。
File
对于这个协议,SPI会在文件系统中扫描URI指定的文件夹然后从URI定义的源文件夹中下载任何的GAR文件或者目录中的以.gar结尾的文件。
支持如下的参数:
|参数|描述|可选|默认值|
|---|---|---|---|
|freq
|扫描频率,以毫秒为单位|是|5000ms
|
File URI示例
下面的示例会在本地每2000毫秒扫描c:/Program files/ignite/deployment
文件夹。注意如果path有空格,setEncodeUri(boolean)
参数必须设置为true
(这也是默认的行为)。
file://freq=2000@localhost/c:/Program files/ignite/deployment
HTTP/HTTPS
URI部署扫描器会试图读取指向的HTML文件的DOM然后解析出所有的<a>
标签的href属性 - 这会成为要部署的URL集合(到GAR文件):每个'A'链接都应该是指向GAR文件的URL。很重要的是只有HTTP扫描器会使用URLConnection.getLastModified()
方法来检查自从重新部署之前对每个GAR文件的最后一次迭代以来是否有任何变化。
支持如下的参数:
|参数|描述|可选|默认值|
|---|---|---|---|
|freq
|扫描频率,以毫秒为单位|是|300000ms
|
HTTP URI示例
下面的示例会下载www.mysite.com/ignite/deployment
页面,解析他然后每10000
毫秒使用username:password
进行认证,对从页面的所有a元素的href属性指定的所有GAR文件进行下载和部署。
http://username:password;[email protected]:110/ignite/deployment
8.11.3.LocalDeploymentSpi
本地部署SPI只是通过register(ClassLoader, Class)
方法实现了在本地节点中的虚拟机进行部署,这个SPI不需要配置。
显式地配置LocalDeploymentSpi
是没有意义的,因为他是默认的以及没有配置参数。