Software Development Services

Copying Context To Executor Service Threads

19 September 2017

Find the sources on Github: https://github.com/danwatford/thread-context-copy

Thread-Specific Context

If building a non-reactive Java web service such as a REST service that acts as an interface to other upstream services it is common to adopt a model of one-thread-per-request with no state held at the web service. This model can be deployed to servlet containers and application servers with ease.

For any onward requests made to upstream services it is often necessary to populate the request with information from the incoming request, possibly to identify the user to the upstream service, or to store a request-id/correlation-id which can be used to trace processing through multiple systems.

This information is not normally passed from method to method through the web service code but is instead held in temporary storage scoped to the original request. Since the one-thread-per-request model is used, this temporary storage normally ends up being ThreadLocal variables. When requests are constructed for the upstream service information can be read from these ThreadLocal variables.

Using Multiple Threads

Depending on the work to be done it may be necessary to submit multiple requests to an upstream service. By sticking with a single thread these requests will be sent sequentially which may result in unacceptable performance for the client. We can use an ExecutorService to execute multiple upstream requests concurrently, but the problem is ensuring any thread specific data is in place on the worker threads that will perform the requests.

By using a new ExecutorService to manage the concurrent upstream requests we can make use of a ContextCopyingThreadFactory to handle copying application specific context from the original thread to any new threads.

Code listing: ContextCopyingThreadFactory.java

package com.foomoo.threadutil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * A {@link java.util.concurrent.ThreadFactory} that copies thread specific ({@link java.lang.ThreadLocal} data between threads. This is useful to
 * copy context across threads for use in a {@link java.util.concurrent.ThreadPoolExecutor}.
 */
public class ContextCopyingThreadFactory implements ThreadFactory {

    private final ThreadFactory threadFactory = Executors.defaultThreadFactory();

    private final List<ContextCopier> contextCopiers;

    /**
     * Construct the {@link ContextCopyingThreadFactory} with the given {@link java.util.Collection} of {@link ContextCopier}s. If the
     * {@link Collection} is ordered, the contexts will be copied in the same order when applied to a new {@link Thread}.
     *
     * @param contextCopiers The {@link ContextCopier}s to apply to new Threads.
     */
    public ContextCopyingThreadFactory(final Collection<ContextCopier> contextCopiers) {
        this.contextCopiers = new ArrayList<>(contextCopiers);
        this.contextCopiers.forEach(ContextCopier::copy);
    }

    public Thread newThread(final Runnable r) {
        return threadFactory.newThread(makeRunnableContextCopying(r));
    }

    /**
     * Takes the given {@link Runnable} and wrap it to execute the registered context-copying operations before the {@link Runnable}'s own operations.
     *
     * @param r The {@link Runnable} to wrap.
     * @return The new {@link Runnable}.
     */
    private Runnable makeRunnableContextCopying(final Runnable r) {

        return () -> {
            contextCopiers.forEach(ContextCopier::apply);
            r.run();
        };
    }
}

The ContextCopyingThreadFactory depends on implementations of ContextCopier to perform the actual reading and writing of thread specific data. The ContextCopyingThreadFactory ensures that ContextCopier#copy will be called on the constructor thread and that ContextCopier#apply will be called on any new threads created by the ThreadFactory.

Implemenations of ContextCopier must ensure any transformations are applied to the copied data as appropriate for the application. For example, if the thread specific data is some sort of cache the implementation may choose to reuse the cache across all threads or create copies depending on whether the cache is considered thread-safe.

Code listing: ContextCopier.java

public interface ContextCopier {

    /**
     * Captures context from the current thread.
     */
    void copy();

    /**
     * Applies the captured context to the current thread.
     */
    void apply();
}

The ExecutorServiceExample (available here: https://github.com/danwatford/thread-context-copy/tree/master/threadcontextcopyexamples/src/main/java/com/foomoo/threadutils/example) demonstrates use of the ContextCopyingThreadFactory with an XRequestIdContextCopier which copies values from/to the Log4J2 ThreadContext.

The value from each thread’s ThreadContext is included in any logging output.

Code listing: ExecutorServiceExample.java

package com.foomoo.threadutils.example;

import com.foomoo.threadutil.ContextCopier;
import com.foomoo.threadutil.ContextCopyingThreadFactory;
import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;

import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class ExecutorServiceExample {

    private static final String KEY = "requestId";

    /**
     * Example use case of the {@link ContextCopyingThreadFactory} where the RequestId for a server processed request is stored in thread-specific
     * storage and copied to other threads.
     * <p>
     * In this example the original request id is set and then the executor is created, making use of the {@link ContextCopyingThreadFactory} itself
     * configured with an instance of {@link XRequestIdContextCopier}. The {@link Callable}s submitted to the executor then log output to demonstrate
     * the RequestId values held in their thread specific storage.
     *
     * @throws InterruptedException Not expected to be thrown.
     */
    public static void main(final String args[]) throws InterruptedException {

        setRequestId("000");

        final Logger logger = LogManager.getLogger();
        logger.info("Example start");

        final ContextCopyingThreadFactory threadFactory = new ContextCopyingThreadFactory(ImmutableList.of(new XRequestIdContextCopier()));
        final ExecutorService executorService = Executors.newFixedThreadPool(5, threadFactory);

        IntStream.rangeClosed(1, 20)
                 .forEach(taskId -> executorService.submit(getRunnable(taskId, logger)));

        logger.info("All tasks submitted");

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.SECONDS);
    }

    private static Callable<Void> getRunnable(final int taskId, final Logger logger) {
        return () -> {
            final String padding = String.join("", Collections.nCopies(taskId, "  "));
            final String message = String.format("%s%02d", padding, taskId);
            logger.info(message);
            Thread.sleep(200);
            logger.info(message);
            return null;
        };
    }

    static void setRequestId(final String requestId) {
        ThreadContext.put(KEY, requestId);
    }

    static String getRequestId() {
        return ThreadContext.get(KEY);
    }

    /**
     * {@link com.foomoo.threadutil.ContextCopier} for interacting with the X Request Id thread-specific storage in order to copy X Request Id values to
     * new threads. Applies a suffix to the requestId for each thread created.
     */
    private static class XRequestIdContextCopier implements ContextCopier {

        private String requestId;
        private AtomicInteger applyCount = new AtomicInteger();

        @Override
        public void copy() {
            requestId = getRequestId();
        }

        @Override
        public void apply() {
            setRequestId(String.format("%s-%02d", requestId, applyCount.getAndIncrement()));
        }
    }
}

Code listing: log4j2-test.properties

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = RequestId=%X{requestId} [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n

rootLogger.level = info
rootLogger.appenderRef.stdout.ref = STDOUT