博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JPPF与Spring集成实现AOP的完整示例
阅读量:6690 次
发布时间:2019-06-25

本文共 5563 字,大约阅读时间需要 18 分钟。

  hot3.png

1.概述

JPPF是一个开放源码的网格计算框架,它可以在一个分布执行环境中 同时运行多个java应用。他历史比较悠久,社区也比较活跃,最新的版本已经更新到5.1.1版本,但是资料较少。 在如今Hadoop以及以Hadoop作为基础设施的各种平台的大红大紫环境下,这个平台的关注度更是不高,但是作为一个备选的解决方案和开发平台,还是有必要展示一下,供做技术选型的人参考。 本文目的有两个:一是用一个简单的例子展示一下JPPF的用法,二是演示了如何与Spring集成实现AOP的方法,仅供参考。

2.环境搭建

JPPF的整个运行环境,需要客户端,服务器(Driver)以及执行具体任务的各个节点,最简单的部署架构如下:

作为一个最简单的运行环境,分别解压从sourceforge上下载的JPPF-5.1.1-admin-ui.zip,JPPF-5.1.1-driver.zip,JPPF-5.1.1-node.zip,无需额外的配置,通过相应的启动脚本分别启动一个driver节点,一个admin-ui节点以及二个node节点,如果看到了节点初始化成功的提示信息,则表示各个节点启动成功。

3.相关代码

官方发布有一个简单的应用模板,JPPF-5.1.1-application-template.zip,这个代码非常简单,如果上述环境启动成功,直接就可以运行,但是需要配置ANT,本文的重点是Spring的集成,所以下面会贴出所有的代码,供大家参考。 与Spring的集成,我们往往需要他的两个很重要的功能特性,一个是组件的组装,这样可以使整个架构设计变的优雅,另一个就是AOP编程,通过注册拦截器,可以实现关注点的分离。这里面需要注意的就是,JPPF在部署上,是使用的分布式类加载器,这是和传统Spring应用最大的一个不同,Spring的AOP实现,最终会利用一些字节码增强技术,这是分布式类加载器不支持的,因此我们无法在任务部署到各个node节点之前就生成AOP代理对象,只能在运行时利用ProxyFactory动态生成,下面是详细的代码:

3.1.程序的入口JPPFExample

package demo;import java.util.ArrayList;import java.util.List;import org.jppf.client.JPPFClient;import org.jppf.client.JPPFConnectionPool;import org.jppf.client.JPPFJob;import org.jppf.client.Operator;import org.jppf.node.protocol.DataProvider;import org.jppf.node.protocol.MemoryMapDataProvider;import org.jppf.node.protocol.Task;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;public class JPPFExample {	public static void main(String[] args) {		try (JPPFClient jppfClient = new JPPFClient()) {			JPPFExample runner = new JPPFExample();			runner.executeMultipleConcurrentJobs(jppfClient, 4);		} catch (Exception e) {			e.printStackTrace();		}	}	public JPPFJob createJob(final String jobName) throws Exception {		ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");		JPPFJob job = new JPPFJob();		job.setName(jobName);		Task
task = job.add(ac.getBean("jppfTask")); task.setId(jobName + " - 任务"); return job; } public void executeMultipleConcurrentJobs(final JPPFClient jppfClient, final int numberOfJobs) throws Exception { ensureNumberOfConnections(jppfClient, numberOfJobs); final List
jobList = new ArrayList<>(numberOfJobs); for (int i = 1; i <= numberOfJobs; i++) { String jobName = "非阻塞作业: " + i; JPPFJob job = createJob(jobName); //DataProvider是JPPF提供的数据共享机制 DataProvider dataProvider = new MemoryMapDataProvider(); dataProvider.setParameter("jobName", jobName); job.setDataProvider(dataProvider); //非阻塞 job.setBlocking(false); //提交作业 jppfClient.submitJob(job); jobList.add(job); } System.out.println("作业执行中 ..."); for (JPPFJob job : jobList) { List
> results = job.awaitResults(); processExecutionResults(job.getName(), results); } } public synchronized void processExecutionResults(final String jobName, final List
> results) { System.out.printf("作业的结果 '%s' :\n", jobName); for (Task
task : results) { String taskName = task.getId(); if (task.getThrowable() != null) { System.out.println(taskName + ", 抛出了异常: " + task.getThrowable().getMessage()); } else { System.out.println(taskName + ", 执行结果: " + task.getResult()); } } } public void ensureNumberOfConnections(final JPPFClient jppfClient, final int numberOfConnections) throws Exception { JPPFConnectionPool pool = jppfClient.awaitActiveConnectionPool(); if (pool.getConnections().size() != numberOfConnections) { pool.setSize(numberOfConnections); } pool.awaitActiveConnections(Operator.AT_LEAST, numberOfConnections); }}

通过上述代码,我们创建了四个非阻塞的任务,并行地发送给node节点去执行。

3.2.在node节点上执行的任务

package demo;import org.jppf.node.protocol.AbstractTask;import org.springframework.aop.framework.ProxyFactory;import org.springframework.beans.factory.annotation.Autowired;public class JPPFTask extends AbstractTask
{ @Autowired private JPPFService jppfService; @Autowired private JPPFServiceInterceptor jppfServiceInterceptor; @Override public void run() { System.out.println("这是执行任务的节点!"); //这个是关健,通过下面的代码动态地为service添加拦截器 ProxyFactory pf = new ProxyFactory(jppfService); pf.setProxyTargetClass(true); pf.addAdvice(jppfServiceInterceptor); JPPFService service = (JPPFService)pf.getProxy(); String jobName = this.getDataProvider().getParameter("jobName"); try { setResult(service.execute(jobName)); } catch (InterruptedException e) { e.printStackTrace(); } }}

3.3.拦截器

这个没啥特别的:

package demo;import java.io.Serializable;import org.aopalliance.intercept.MethodInterceptor;import org.aopalliance.intercept.MethodInvocation;//要实现Serializable接口,否则无法序列化public class JPPFServiceInterceptor implements MethodInterceptor,Serializable {	@Override	public Object invoke(MethodInvocation mi) throws Throwable {		System.out.println("拦截器执行开始:");		Object obj = mi.proceed();		System.out.println("拦截器执行结束:");		return obj;	}}

3.4.业务Service

package demo;import java.io.Serializable;//要实现Serializable接口,否则无法序列化public class JPPFService implements Serializable {		public String execute(String name) throws InterruptedException{		System.out.println("任务名称:" + name);		Thread.currentThread().sleep(10000);		return "任务执行成功!";	}}

3.5.配置文件

代码非常简单。 之后可以在eclipse中运行JPPFExample,就可以在node节点的控制台上看到运行的结果。

4.控制台

JPPF的控制台如下图所示:

这个控制台功能还是很丰富的,不只是监控,还有作业的管理和维护功能。

5.总结

JPPF还是比较成熟的,和其他的网格计算平台相比,有两个突出的优点,一是虽然各种资料较少,但是文档质量较高,二是有一个非常不错的控制台,这是其他平台所不具备的,或者是收费的。 缺点呢,一是社区虽然活跃,但是不够强大,推广上或者影响力不足;二是在部署上,在复杂的拓扑环境下配置比Ignite等采用对等架构的要复杂,整体维护管理成本要高些;三是目前看不到太多的应用案例,采用这个技术的太少了,性能,稳定性等等大家更为关心的都还未知。

转载于:https://my.oschina.net/liyuj/blog/612741

你可能感兴趣的文章
RPC的实现
查看>>
不一样的Office 365之 —— 使用StaffHub管理你的排班
查看>>
从Mysql EXPLAIN探寻数据库查询优化2
查看>>
让元素居中
查看>>
php memcache保存session的一个设置误区
查看>>
鱼眼镜头
查看>>
Scalatra
查看>>
CentOS 7 三者分离编译安装LAMP
查看>>
Linux内核调整,支持4000-8000并发
查看>>
jquery mobile 设置设备适配
查看>>
redis使用总结-redis命令总结
查看>>
创业浪潮:春天蓬勃而来
查看>>
阿里云Linux安装软件镜像源
查看>>
阿里云对象存储OSS支持版本管理特性
查看>>
用python 访问redis的几种常用方式
查看>>
我的友情链接
查看>>
Linux Shell 基本概念及编程(5)
查看>>
RDBMS DBMS MS DB
查看>>
我的友情链接
查看>>
svn 实践
查看>>