Ignite.NET插件示例:分布式Semaphore(信号量)

Ignite.NET从2.0版本开始,引入了插件系统,插件能够仅在于.NET环境中,也能够在于.NET + Java混合环境中,本文会描述如何在后者实现插件。html

为何须要插件?

Ignite.NET构建于Ignite(用Java编写)之上,JVM会在.NET进程中启动,.NET部分与Java部分进行交互,并尽量重用现有的Ignite功能。java

插件系统将此平台交互机制公开给第三方,主要场景之一是在.NET中可使用Ignite和第三方Java API。git

这种API的一个典型事例是IgniteSemaphore,该功能在Ignite.NET中尚不可用。github

分布式Semaphore API

Ignite中的Semaphore相似于System.Threading.SemaphoreMSDN),可是是在整个集群中生效的,限制在全部Ignite节点上执行指定代码段的线程数。apache

代码大体以下:分布式

IIgnite ignite = Ignition.GetIgnite();
ISemaphore semaphore = ignite.GetOrCreateSemaphore(name: "foo", count: 3);

semaphore.WaitOne();  // Enter the semaphore (may block)
// Do work
semaphore.Release();

看起来很简单并且很是有用,与.NET内置的SemaphoreAPI相同。显然不能更改IIgnite的接口,所以GetOrCreateSemaphore就是一个扩展点,下面会详细描述。ide

Java插件

先看Java端,这里须要一种调用Ignite.semaphore()的方法并向.NET平台提供访问该实例的方法。测试

建立一个Java项目并经过Maven引用Ignite(具体内容请参见构建多平台Ignite集群文章)。ui

每一个插件都以PluginConfiguration开始,本例的插件不须要任何配置属性,可是该类必须存在,所以只需建立一个简单的类便可:this

public class IgniteNetSemaphorePluginConfiguration implements PluginConfiguration {}

而后是插件的入口:PluginProvider<PluginConfiguration>。该接口有不少方法,可是大多数方法均可觉得空(nameversion不能为空,所以须要为其赋值)。这里只需关注initExtensions方法,它是跨平台互操做的入口点,本例中作的就是注册PlatformPluginExtension实现:

public class IgniteNetSemaphorePluginProvider implements PluginProvider<IgniteNetSemaphorePluginConfiguration> {
    public String name() { return "DotNetSemaphore"; }
    public String version() { return "1.0"; }

    public void initExtensions(PluginContext pluginContext, ExtensionRegistry extensionRegistry)
            throws IgniteCheckedException {
        extensionRegistry.registerExtension(PlatformPluginExtension.class,
                new IgniteNetSemaphorePluginExtension(pluginContext.grid()));
    }
...
}

PlatformPluginExtension有一个惟一的id,用于从.NET端访问它,还有一个PlatformTarget createTarget()方法,用于建立能够从.NET端访问的对象。

Java中的PlatformTarget会映射到.NET中的IPlatformTarget接口,当在.NET中调用IPlatformTarget.InLongOutLong时,就会调用Java实现中的PlatformTarget.processInLongOutLong。还有许多其余方法能够用于交换基本类型、序列化数据和对象。每一个方法都有一个指定了操做代码的type参数,以防插件上有不少不一样的方法。

本例中须要两个PlatformTarget类:一个表明整个插件并具备getOrCreateSemaphore方法,另外一个表明每一个特定信号量。第一个应该持有字符串类型的名称和整型的计数器并返回一个对象,所以须要实现PlatformTarget.processInStreamOutObject,其余方法都不须要能够将其置空:

public class IgniteNetPluginTarget implements PlatformTarget {
    private final Ignite ignite;

    public IgniteNetPluginTarget(Ignite ignite) {
        this.ignite = ignite;
    }

    public PlatformTarget processInStreamOutObject(int i, BinaryRawReaderEx binaryRawReaderEx) throws IgniteCheckedException {
        String name = binaryRawReaderEx.readString();
        int count = binaryRawReaderEx.readInt();

        IgniteSemaphore semaphore = ignite.semaphore(name, count, true, true);

        return new IgniteNetSemaphore(semaphore);
    }
...
}

.NET中的每一个ISemaphore对象在Java中都会有一个对应的IgniteNetSemaphore,它也是一个PlatformTarget。这个对象将处理WaitOneRelease方法,并将它们委托给底层的IgniteSemaphore对象。因为这两个方法都是返回void且是无参数的,所以最简单的PlatformTarget是:

public long processInLongOutLong(int i, long l) throws IgniteCheckedException {
    if (i == 0) semaphore.acquire();
    else semaphore.release();

    return 0;
}

这样Java部分就完成了!建立resources\META-INF.services\org.apache.ignite.plugin.PluginProvider文件,内容为类名,Java服务加载器就能够加载该类。使用Maven打包该项目(在终端中执行mvn package或使用IDE)后,target目录中就应该有一个IgniteNetSemaphorePlugin-1.0-SNAPSHOT.jar文件。

.NET插件

首先建立一个控制台项目,安装Ignite NuGet软件包,并以刚刚建立的jar文件的路径启动Ignite:

var cfg = new IgniteConfiguration
{
    JvmClasspath = @"..\..\..\..\Java\target\IgniteNetSemaphorePlugin-1.0-SNAPSHOT.jar"
};

Ignition.Start(cfg);

Ignite节点启动后就能够在日志中看到插件的名称:

[16:02:38] Configured plugins:
[16:02:38]   ^-- DotNetSemaphore 1.0

对于.NET部分将采用API优先的方法:首先实现扩展方法,而后从那里继续。

public static class IgniteExtensions
{
    public static Semaphore GetOrCreateSemaphore(this IIgnite ignite, string name, int count)
    {
        return ignite.GetPlugin<SemaphorePlugin>("semaphorePlugin").GetOrCreateSemaphore(name, count);
    }
}

为了使该GetPlugin方法生效,须要配置IgniteConfiguration.PluginConfigurations属性,它持有IPluginConfiguration实现的集合,而且每一个实现又必须连接到IPluginProvider的实现:

[PluginProviderType(typeof(SemaphorePluginProvider))]
class SemaphorePluginConfiguration : IPluginConfiguration  {...}

在节点启动时,Ignite.NET会迭代插件配置,实例化插件提供者,并调用其Start(IPluginContext<SemaphorePluginConfiguration> context)方法,而后对IIgnite.GetPlugin的调用会委托给指定名字的提供者的IPluginProvider.GetPlugin

class SemaphorePluginProvider : IPluginProvider<SemaphorePluginConfiguration>
{
    private SemaphorePlugin _plugin;

    public T GetPlugin<T>() where T : class
    {
        return _plugin as T;
    }

    public void Start(IPluginContext<SemaphorePluginConfiguration> context)
    {
        _plugin = new SemaphorePlugin(context);
    }

    ...

}

经过IPluginContext能够访问Ignite实例、Ignite和插件的配置,还有GetExtension方法,会委托给Java中的PlatformPluginExtension.createTarget()方法,这样就能够在两个平台之间“创建链接”。.NET中的IPlatformTarget连接到Java中的PlatformTarget,它们能够相互调用,而且Java对象的生存周期与.NET对象的生存周期是关联的,即一旦垃圾收集器回收了.NET对象,也会释放Java对象的引用,所以Java对象也会被回收。

下面的实现很简单,只调用了对应的IPlatformTarget方法:

class SemaphorePlugin
{
    private readonly IPlatformTarget _target;  // Refers to IgniteNetPluginTarget in Java

    public SemaphorePlugin(IPluginContext<SemaphorePluginConfiguration> context)
    {
        _target = context.GetExtension(100);
    }

    public Semaphore GetOrCreateSemaphore(string name, int count)
    {
        var semaphoreTarget = _target.InStreamOutObject(0, w =>
        {
            w.WriteString(name);
            w.WriteInt(count);
        });

        return new Semaphore(semaphoreTarget);
    }
}

class Semaphore
{
    private readonly IPlatformTarget _target;  // Refers to IgniteNetSemaphore in Java

    public Semaphore(IPlatformTarget target)
    {
        _target = target;
    }

    public void WaitOne()
    {
        _target.InLongOutLong(0, 0);
    }

    public void Release()
    {
        _target.InLongOutLong(1, 0);
    }
}

这样就能够了,而且向现有插件添加更多逻辑也很容易,只需在两侧实现一对方法便可。Ignite使用JNI和非托管内存在.NET和Java平台之间使用一个进程交换数据,既简单又高效。

测试

为了演示Semaphore的分布式特性,能够运行多个Ignite节点,每一个节点都调用WaitOne(),就会看到一次只有两个节点可以获取信号量:

var ignite = Ignition.Start(cfg);
var sem = ignite.GetOrCreateSemaphore("foo", 2);

Console.WriteLine("Trying to acquire semaphore...");

sem.WaitOne();

Console.WriteLine("Semaphore acquired. Press any key to release.");
Console.ReadKey();
相关文章
相关标签/搜索