CDI based @Asynchronous alternative

Arguably one of the most convenient things in EJB after declarative transactions is the @Asynchronous annotation. Applying this annotation to a method will cause it to be executed asynchronously when called (the caller does not have to wait for the method to finish executing).

The downside of this annotation is that it's only applicable to EJB beans. While EJB beans these days are lightweight and nothing to avoid in general, the fact is that in Java EE 6 and especially Java EE 7 other managed beans, specifically CDI ones, play an increasingly important role. These beans unfortunately can not directly take advantage of the platform provided @Asynchronous.

Building such support ourselves in Java EE 7 however is not that difficult. Thanks to the Java 8, and the Interceptors and Concurrency specs it's actually quite simple, but with a small caveat (see below):

We'll start with defining the annotation itself:

@InterceptorBinding
@Target({METHOD})
@Retention(RUNTIME)
@Inherited
public @interface Asynchronous {}

Next we need a helper class that effectively unwraps the dummy Future instance (of type AsyncResult, as provided by the EJB spec) that an asynchronous method returns. Such a wrapper class is needed in Java, since you otherwise can't call a method that returns say String and assign it to Future<String>. This is not specific to this CDI implementation, but is exactly how EJB's @Asynchronous works.

public class FutureDelegator implements Future<Object> {
    
    private final Future<?> future;
    
    public FutureDelegator(Future<?> future) {
        this.future = future;
    }

    @Override
    public Object get() throws InterruptedException, ExecutionException {
        AsyncResult<?> asyncResult = (AsyncResult<?>) future.get();
        if (asyncResult == null) {
            return null;
        }
        
        return asyncResult.get(); 
    }
    
    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        AsyncResult<?> asyncResult = (AsyncResult<?>) future.get(timeout, unit);
        if (asyncResult == null) {
            return null;
        }
        
        return asyncResult.get(); 
    }
    
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return future.cancel(mayInterruptIfRunning);
    }
    
    @Override
    public boolean isCancelled() {
        return future.isCancelled();
    }
    @Override
    public boolean isDone() {
        return future.isDone();
    }
}

With those 2 classes in place the actual interceptor can be coded as follows:

@Interceptor
@Asynchronous
@Priority(PLATFORM_BEFORE)
public class AsynchronousInterceptor implements Serializable {

    private static final long serialVersionUID = 1L;

    @Resource
    private ManagedExecutorService managedExecutorService;

    @AroundInvoke
    public Object submitAsync(InvocationContext ctx) throws Exception {
        return new FutureDelegator(managedExecutorService.submit( ()-> { return ctx.proceed(); } ));
    }
}

There are a few things to take into account here. The first is the priority of the interceptor. I put it on PLATFORM_BEFORE, which is the absolute lowest level, meaning the interceptor will likely hit before any other interceptor. If this interceptor would ship with a library it's more correct to use the lowest range reserved for libraries: LIBRARY_BEFORE.

For the actual parallel execution, the call to ctx.proceed() is scheduled on a thread pool using the Java EE Concurrency provided executor service. While this service was only recently introduced in Java EE 7, it in fact originated from a very old spec draft that was dragged into modern times. Unfortunately that spec felt it needed to use the somewhat archaic @Resource annotation for injection instead of the more modern @Inject. So that's why we use that former one here and not the latter.

A caveat is that the interceptor as given does not work on the current released versions of Weld, but in fact does work on the not yet released SNAPSHOT version. The issue is explained by Jozef on the CDI-dev mailing list.

As a temporary workaround a thread local guard can be used on Weld as follows:

@Interceptor
@Asynchronous
@Priority(PLATFORM_BEFORE)
public class AsynchronousInterceptor implements Serializable {

    private static final long serialVersionUID = 1L;
    
    @Resource
    private ManagedExecutorService managedExecutorService;
    
    private static final ThreadLocal<Boolean> asyncInvocation = new ThreadLocal<Boolean>();

    @AroundInvoke
    public synchronized Object submitAsync(InvocationContext ctx) throws Exception {
        
        if (TRUE.equals(asyncInvocation.get())) {
            return ctx.proceed();
        }
        
        return new FutureDelegator(managedExecutorService.submit( ()-> { 
            try {
                asyncInvocation.set(TRUE);
                return ctx.proceed();
            } finally {
                 asyncInvocation.remove();
            }
        }));
  
    }
}

Future work

The interceptor shown here is just a bare bones copy of the EJB version, but lacks the setup of a request scope. Going further however we can add additional features, like using a completable future, optionally named thread pools, etc.

Arjan Tijms

Comments

Popular posts from this blog

Jakarta EE Survey 2022

Implementing container authentication in Java EE with JASPIC

Counting the rows returned from a JPA query