Asynchronous
★EJBの@Asynchronousについて
書き方1
@Stateless
public class MyEJB {
@Asynchronous
public void printOrder(Order order) {
...
}
}
書き方2
@Stateless
@Asynchronous
public class MyEJB {
@Resource
private SessionContext ctx;
public void printOrder(Order order) {
...
}
public Future<Integer> sendOrder(Order order) {
Integer status = 0;
...
status = 1;
if (ctx.wasCancelCalled()) {
return new AsyncResult<Integer>(2);
}
...
return new AsyncResult<Integer>(status);
}
}
利用例
Future<Integer> result = myEJB.sendOrder(order);
Integer status = result.get();
サンプル1
@Singleton
public class JobProcessor {
@Asynchronous
@Lock(LockType.READ)
@AccessTimeout(-1)
public Future<String> addJob(String jobName) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
Thread.interrupted();
throw new IllegalStateException(e);
}
return new AsyncResult<String>(jobName);
}
}
@Inject
private JobProcessor processor;
// 同期の場合は約20秒、非同期の場合は約10秒
final Future<String> job1 = processor.addJob("job1");
final Future<String> job2 = processor.addJob("job2");
String status1 = job1.get();
String status2 = job2.get();
★Asynchronous CDI Events
サンプル2
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.interceptor.AroundInvoke;
import javax.interceptor.InvocationContext;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@Singleton
@Lock(LockType.READ)
public class Starter {
@EJB
private Executor executor;
private Future future;
@Getter
private String field;
@PostConstruct
private void construct() throws Exception {
future = executor.submit(new Callable() {
@Override
public Object call() throws Exception {
...
Starter.this.field = "xxx";
return null;
}
});
}
@AroundInvoke
private Object complete(InvocationContext context) throws Exception {
future.get();
return context.proceed();
}
@Asynchronous // クラスに付加できる
@Lock(LockType.READ)
@AccessTimeout(-1) // wait forever
//@AccessTimeout(0) // execute now
//@AccessTimeout(value = 5, unit = TimeUnit.SECONDS)
public Future<String> addJob(String jobName) {
...
return new AsyncResult<String>(jobName);
}
}
import javax.ejb.AsyncResult;
import javax.ejb.Asynchronous;
@Singleton
@Lock(LockType.READ)
public class Executor {
@Asynchronous
public <T> Future<T> submit(Callable<T> task) throws Exception {
return new AsyncResult<T>(task.call());
}
}
サンプル3
@Stateless
public class CustomerService {
@Inject
private EntityManager em;
@Inject
private Event<MailEvent> eventProducer;
public void saveSuccess() {
Customer customer = new Customer();
...
em.persist(customer);
sendEmail();
}
private void sendEmail() {
MailEvent event = new MailEvent();
event.setTo("someone@example.com");
event.setSubject("...");
event.setMessage("...");
eventProducer.fire(event);
}
}
@Singleton
public class MailService {
@Resource(mappedName = "java:jboss/mail/Default")
private Session mailSession;
@Asynchronous
@Lock(LockType.READ)
public void sendMail(@Observes(during = TransactionPhase.AFTER_SUCCESS) MailEvent event) {
try {
MimeMessage m = new MimeMessage(mailSession);
Address[] to = new InternetAddress[] {new InternetAddress(event.getTo())};
m.setRecipients(Message.RecipientType.TO, to);
m.setSubject(event.getSubject());
m.setSentDate(new java.util.Date());
m.setContent(event.getMessage(),"text/plain");
Transport.send(m);
} catch (MessagingException e) {
throw new RuntimeException(e);
}
}
}
Default Synchronous
@Path("/xxx")
public class EventProducer {
@Inject
private Event<MyEvent> events;
@Path("/{num}")
@GET
public String produceEvents(@PathParam("num") int number) {
for (int i = 0; i < number; i++) {
MyEvent event = new MyEvent(i);
logger.info("Producing Event: " + event);
events.fire(event);
}
...
}
}
public class EventConsumer {
public void consumeEvent(@Observes MyEvent event)
throws InterruptedException {
logger.info("Consuming event: " + event);
TimeUnit.MILLISECONDS.sleep(500);
}
}
time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]
time2 (http-/127.0.0.1:8080-1) Consuming event: MyEvent[seqNo=0]
time3 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]
time4 (http-/127.0.0.1:8080-1) Consuming event: MyEvent[seqNo=1]
Asynchronousにする方法
方法1 CDI Producer and Singleton EJB Receiver
//EventProducerは同上
@Singleton
public class EventConsumer {
@Asynchronous
//@Lock(LockType.WRITE) Default
public void consumeEvent(@Observes MyEvent event) { ... }
}
time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]
time2 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]
time3 (EJB default – 2) Consuming event: MyEvent[seqNo=1]
time4 (EJB default – 1) Consuming event: MyEvent[seqNo=0]
※Thread-safe observer method: yes
方法2 Singleton EJB Receiver With Read Lock
//EventProducerは同上
@Singleton
public class EventConsumer {
@Asynchronous
@Lock(LockType.READ)
public void consumeEvent(@Observes MyEvent event) { ... }
}
time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]
time2 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]
time3 (EJB default – 1) Consuming event: MyEvent[seqNo=0]
time3 (EJB default – 2) Consuming event: MyEvent[seqNo=1]
※Thread-safe observer method: no
※Observer method will start processing events after the first one will be fired
方法3 EJB Producer and CDI Consumer
@Stateless
//@TransactionAttribute(TransactionAttributeType.REQUIRED) Default
@Path("/xxx")
public class EventProducer {
@Path("/{num}")
@GET
public String produceEvents(@PathParam("num") int number) { ... }
}
public class EventConsumer {
public void consumeEvent(
@Observes(during = TransactionPhase.AFTER_COMPLETION)
MyEvent event) { ... }
}
time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]
time2 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]
time3 (http-/127.0.0.1:8080-1) Consuming event: MyEvent[seqNo=1]
time4 (http-/127.0.0.1:8080-1) Consuming event: MyEvent[seqNo=0]
※Thread-safe observer method: yes
方法4 EJB Producer and EJB Consumer
@Stateless
@Path("/xxx")
public class EventProducer {
@Path("/{num}")
@GET
public String produceEvents(@PathParam("num") int number) { ... }
}
@Singleton
public class EventConsumer {
@Asynchronous
@Lock(LockType.READ)
public void consumeEvent(
@Observes(during = TransactionPhase.AFTER_COMPLETION)
MyEvent event) { ... }
}
time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]
time2 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]
time3 (EJB default – 2) Consuming event: MyEvent[seqNo=1]
time3 (EJB default – 1) Consuming event: MyEvent[seqNo=0]
※Thread-safe observer method: no
※Observer method will start processing after all events will be fired (transaction completes).
方法5 EJB Producer and CDI Consumer II
@Stateless
@Path("/xxx")
public class EventProducer {
@Resource
private SessionContext sctx;
@Path("/{num}")
@GET
public String produceEvents(@PathParam("num") int number) {
for (int i = 0; i < number; i++) {
sctx.getBusinessObject(EventProducer.class)
.fireEvent(new MyEvent(i));
}
...
}
@Asynchronous
public void fireEvent(final MyEvent event) {
events.fire(event);
}
}
public class EventConsumer {
public void consumeEvent(@Observes MyEvent event) { ... }
}
time1 (EJB default – 2) Producing Event: MyEvent[seqNo=1]
time1 (EJB default – 1) Producing Event: MyEvent[seqNo=0]
time2 (EJB default – 1) Consuming event: MyEvent[seqNo=0]
time2 (EJB default – 2) Consuming event: MyEvent[seqNo=1]
※Thread-safe observer method: no
方法6 portable CDI extension
public class AsyncEventExtension implements Extension {
...
}
@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.PARAMETER })
public @interface AsyncEvent {
}
public void consumeEvent(@Observes @AsyncEvent MyEvent event) {
...
}
方法7 CDI With JMS