Asynchronous

★EJBの@Asynchronousについて

書き方1

@Stateless

public class MyEJB {

@Asynchronous

public void printOrder(Order order) {

...

}

}

書き方2

@Stateless

@Asynchronous

public class MyEJB {

@Resource

private SessionContext ctx;

public void printOrder(Order order) {

...

}

public Future<Integer> sendOrder(Order order) {

Integer status = 0;

...

status = 1;

if (ctx.wasCancelCalled()) {

return new AsyncResult<Integer>(2);

}

...

return new AsyncResult<Integer>(status);

}

}

利用例

Future<Integer> result = myEJB.sendOrder(order);

Integer status = result.get();

サンプル1

@Singleton

public class JobProcessor {

@Asynchronous

@Lock(LockType.READ)

@AccessTimeout(-1)

public Future<String> addJob(String jobName) {

try {

TimeUnit.SECONDS.sleep(10);

} catch (InterruptedException e) {

Thread.interrupted();

throw new IllegalStateException(e);

}

return new AsyncResult<String>(jobName);

}

}

@Inject

private JobProcessor processor;

// 同期の場合は約20秒、非同期の場合は約10秒

final Future<String> job1 = processor.addJob("job1");

final Future<String> job2 = processor.addJob("job2");

String status1 = job1.get();

String status2 = job2.get();

★Asynchronous CDI Events

event

サンプル2

import javax.annotation.PostConstruct;

import javax.ejb.EJB;

import javax.ejb.Lock;

import javax.ejb.LockType;

import javax.ejb.Singleton;

import javax.interceptor.AroundInvoke;

import javax.interceptor.InvocationContext;

import java.util.concurrent.Callable;

import java.util.concurrent.Future;

@Singleton

@Lock(LockType.READ)

public class Starter {

@EJB

private Executor executor;

private Future future;

@Getter

private String field;

@PostConstruct

private void construct() throws Exception {

future = executor.submit(new Callable() {

@Override

public Object call() throws Exception {

...

Starter.this.field = "xxx";

return null;

}

});

}

@AroundInvoke

private Object complete(InvocationContext context) throws Exception {

future.get();

return context.proceed();

}

@Asynchronous // クラスに付加できる

@Lock(LockType.READ)

@AccessTimeout(-1) // wait forever

//@AccessTimeout(0) // execute now

//@AccessTimeout(value = 5, unit = TimeUnit.SECONDS)

public Future<String> addJob(String jobName) {

...

return new AsyncResult<String>(jobName);

}

}

import javax.ejb.AsyncResult;

import javax.ejb.Asynchronous;

@Singleton

@Lock(LockType.READ)

public class Executor {

@Asynchronous

public <T> Future<T> submit(Callable<T> task) throws Exception {

return new AsyncResult<T>(task.call());

}

}

サンプル3

@Stateless

public class CustomerService {


@Inject

private EntityManager em;


@Inject

private Event<MailEvent> eventProducer;


public void saveSuccess() {

Customer customer = new Customer();

...

em.persist(customer);


sendEmail();

}

private void sendEmail() {

MailEvent event = new MailEvent();

event.setTo("someone@example.com");

event.setSubject("...");

event.setMessage("...");

eventProducer.fire(event);

}

}

@Singleton

public class MailService {


@Resource(mappedName = "java:jboss/mail/Default")

private Session mailSession;


@Asynchronous

@Lock(LockType.READ)

public void sendMail(@Observes(during = TransactionPhase.AFTER_SUCCESS) MailEvent event) {

try {

MimeMessage m = new MimeMessage(mailSession);

Address[] to = new InternetAddress[] {new InternetAddress(event.getTo())};

m.setRecipients(Message.RecipientType.TO, to);

m.setSubject(event.getSubject());

m.setSentDate(new java.util.Date());

m.setContent(event.getMessage(),"text/plain");

Transport.send(m);

} catch (MessagingException e) {

throw new RuntimeException(e);

}

}

}

Default Synchronous

@Path("/xxx")

public class EventProducer {

@Inject

private Event<MyEvent> events;

@Path("/{num}")

@GET

public String produceEvents(@PathParam("num") int number) {

for (int i = 0; i < number; i++) {

MyEvent event = new MyEvent(i);

logger.info("Producing Event: " + event);

events.fire(event);

}

...

}

}

public class EventConsumer {

public void consumeEvent(@Observes MyEvent event)

throws InterruptedException {

logger.info("Consuming event: " + event);

TimeUnit.MILLISECONDS.sleep(500);

}

}

time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]

time2 (http-/127.0.0.1:8080-1) Consuming event: MyEvent[seqNo=0]

time3 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]

time4 (http-/127.0.0.1:8080-1) Consuming event: MyEvent[seqNo=1]

Asynchronousにする方法

方法1 CDI Producer and Singleton EJB Receiver

//EventProducerは同上

@Singleton

public class EventConsumer {

@Asynchronous

//@Lock(LockType.WRITE) Default

public void consumeEvent(@Observes MyEvent event) { ... }

}

time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]

time2 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]

time3 (EJB default – 2) Consuming event: MyEvent[seqNo=1]

time4 (EJB default – 1) Consuming event: MyEvent[seqNo=0]

※Thread-safe observer method: yes

方法2 Singleton EJB Receiver With Read Lock

//EventProducerは同上

@Singleton

public class EventConsumer {

@Asynchronous

@Lock(LockType.READ)

public void consumeEvent(@Observes MyEvent event) { ... }

}

time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]

time2 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]

time3 (EJB default – 1) Consuming event: MyEvent[seqNo=0]

time3 (EJB default – 2) Consuming event: MyEvent[seqNo=1]

※Thread-safe observer method: no

※Observer method will start processing events after the first one will be fired

方法3 EJB Producer and CDI Consumer

@Stateless

//@TransactionAttribute(TransactionAttributeType.REQUIRED) Default

@Path("/xxx")

public class EventProducer {

@Path("/{num}")

@GET

public String produceEvents(@PathParam("num") int number) { ... }

}

public class EventConsumer {

public void consumeEvent(

@Observes(during = TransactionPhase.AFTER_COMPLETION)

MyEvent event) { ... }

}

time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]

time2 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]

time3 (http-/127.0.0.1:8080-1) Consuming event: MyEvent[seqNo=1]

time4 (http-/127.0.0.1:8080-1) Consuming event: MyEvent[seqNo=0]

※Thread-safe observer method: yes

方法4 EJB Producer and EJB Consumer

@Stateless

@Path("/xxx")

public class EventProducer {

@Path("/{num}")

@GET

public String produceEvents(@PathParam("num") int number) { ... }

}

@Singleton

public class EventConsumer {

@Asynchronous

@Lock(LockType.READ)

public void consumeEvent(

@Observes(during = TransactionPhase.AFTER_COMPLETION)

MyEvent event) { ... }

}

time1 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=0]

time2 (http-/127.0.0.1:8080-1) Producing Event: MyEvent[seqNo=1]

time3 (EJB default – 2) Consuming event: MyEvent[seqNo=1]

time3 (EJB default – 1) Consuming event: MyEvent[seqNo=0]

※Thread-safe observer method: no

※Observer method will start processing after all events will be fired (transaction completes).

方法5 EJB Producer and CDI Consumer II

@Stateless

@Path("/xxx")

public class EventProducer {

@Resource

private SessionContext sctx;

@Path("/{num}")

@GET

public String produceEvents(@PathParam("num") int number) {

for (int i = 0; i < number; i++) {

sctx.getBusinessObject(EventProducer.class)

.fireEvent(new MyEvent(i));

}

...

}

@Asynchronous

public void fireEvent(final MyEvent event) {

events.fire(event);

}

}

public class EventConsumer {

public void consumeEvent(@Observes MyEvent event) { ... }

}

time1 (EJB default – 2) Producing Event: MyEvent[seqNo=1]

time1 (EJB default – 1) Producing Event: MyEvent[seqNo=0]

time2 (EJB default – 1) Consuming event: MyEvent[seqNo=0]

time2 (EJB default – 2) Consuming event: MyEvent[seqNo=1]

※Thread-safe observer method: no

方法6 portable CDI extension

public class AsyncEventExtension implements Extension {

...

}

@Qualifier

@Retention(RetentionPolicy.RUNTIME)

@Target({ ElementType.PARAMETER })

public @interface AsyncEvent {

}

public void consumeEvent(@Observes @AsyncEvent MyEvent event) {

...

}

方法7 CDI With JMS