Подтвердить что ты не робот

Распространение ThreadLocal на новую тему, полученную из ExecutorService

Я запускаю процесс в отдельном потоке с таймаутом, используя ExecutorService и Будущее (пример кода здесь) (происходит поток "нереста" в AOP Aspect).

Теперь основной поток представляет собой запрос Resteasy. Resteasy использует одну или несколько переменных ThreadLocal для хранения некоторой информации контекста, которую мне нужно получить в какой-то момент в вызове метода Rest. Проблема в том, что, поскольку поток Resteasy запущен в новом потоке, переменные ThreadLocal arelost.

Каким будет лучший способ "распространять" любую переменную ThreadLocal, используемую Resteasy для нового потока? Кажется, что Resteasy использует более одной переменной ThreadLocal для отслеживания контекстной информации, и я хотел бы "слепо" передать всю информацию в новый поток.

Я просмотрел подклассификацию ThreadPoolExecutor и используя метод beforeExecute, чтобы передать текущий поток в пул, но я не мог 't найти способ передать переменные ThreadLocal в пул.

Любое предложение?

Спасибо

4b9b3361

Ответ 1

Набор экземпляров ThreadLocal, связанных с потоком, хранится в частных членах каждого Thread. Ваш единственный шанс перечислить их - сделать некоторое отражение на Thread; таким образом, вы можете переопределить ограничения доступа в полях потока.

Как только вы можете получить набор ThreadLocal, вы можете скопировать в потоки фона с помощью beforeExecute() и afterExecute() перехватчиков ThreadPoolExecutor, или создав обертку Runnable для ваших задач, которая перехватывает run() вызов, чтобы установить ненужные экземпляры ThreadLocal. Фактически, последний способ может работать лучше, поскольку он предоставит вам удобное место для хранения значений ThreadLocal во время постановки задачи в очередь.


Обновление: здесь более конкретная иллюстрация второго подхода. Вопреки моему первоначальному описанию, все, что хранится в оболочке, является вызывающим потоком, который допрашивается при выполнении задачи.

static Runnable wrap(Runnable task)
{
  Thread caller = Thread.currentThread();
  return () -> {
    Iterable<ThreadLocal<?>> vars = copy(caller);
    try {
      task.run();
    }
    finally {
      for (ThreadLocal<?> var : vars)
        var.remove();
    }
  };
}

/**
 * For each {@code ThreadLocal} in the specified thread, copy the thread 
 * value to the current thread.  
 * 
 * @param caller the calling thread
 * @return all of the {@code ThreadLocal} instances that are set on current thread
 */
private static Collection<ThreadLocal<?>> copy(Thread caller)
{
  /* Use a nasty bunch of reflection to do this. */
  throw new UnsupportedOperationException();
}

Ответ 2

Как я понимаю вашу проблему, вы можете взглянуть на InheritableThreadLocal, который предназначен для передачи переменных ThreadLocal из контекста родительского потока Контекст дочернего потока

Ответ 3

Основываясь на ответе @erickson, я написал этот код. Он работает для inheritableThreadLocals. Он создает список inheritableThreadLocals, используя тот же метод, что и в контрукторе Thread. Конечно, я использую размышления, чтобы сделать это. Также я переопределяю класс исполнителя.

public class MyThreadPoolExecutor extends ThreadPoolExecutor
{
   @Override
   public void execute(Runnable command)
   {
      super.execute(new Wrapped(command, Thread.currentThread()));
   }
}

Упаковочный:

   private class Wrapped implements Runnable
   {
      private final Runnable task;

      private final Thread caller;

      public Wrapped(Runnable task, Thread caller)
      {
         this.task = task;
         this.caller = caller;
      }

      public void run()
      {
         Iterable<ThreadLocal<?>> vars = null;
         try
         {
            vars = copy(caller);
         }
         catch (Exception e)
         {
            throw new RuntimeException("error when coping Threads", e);
         }
         try {
            task.run();
         }
         finally {
            for (ThreadLocal<?> var : vars)
               var.remove();
         }
      }
   }

метод копирования:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
   {
      List<ThreadLocal<?>> threadLocals = new ArrayList<>();
      Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
      field.setAccessible(true);
      Object map = field.get(caller);
      Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
      table.setAccessible(true);

      Method method = ThreadLocal.class
              .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
      method.setAccessible(true);
      Object o = method.invoke(null, map);

      Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
      field2.setAccessible(true);
      field2.set(Thread.currentThread(), o);

      Object tbl = table.get(o);
      int length = Array.getLength(tbl);
      for (int i = 0; i < length; i++)
      {
         Object entry = Array.get(tbl, i);
         Object value = null;
         if (entry != null)
         {
            Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
                    "get");
            referentField.setAccessible(true);
            value = referentField.invoke(entry);
            threadLocals.add((ThreadLocal<?>) value);
         }
      }
      return threadLocals;
   }

Ответ 4

Вот пример, чтобы передать текущий LocaleContext в родительском потоке дочерний поток, натянутый на CompletableFuture [По умолчанию используется ForkJoinPool].

Просто определите все, что вы хотели сделать в дочернем потоке внутри блока Runnable. Поэтому, когда CompletableFuture выполняет блок Runnable, его дочерний поток, который находится под контролем и voila, у вас есть родительский материал ThreadLocal, установленный в Child ThreadLocal.

Проблема здесь заключается не в полном копировании ThreadLocal. Скопирован только файл LocaleContext. Поскольку ThreadLocal имеет частный доступ только к Thread, он тоже использует Reflection и пытается получить и установить в Child, это слишком много шуточных вещей, которые могут привести к утечке памяти или повышению производительности.

Итак, если вы знаете параметры, которые вас интересуют от ThreadLocal, то это решение работает чище.

 public void parentClassMethod(Request request) {
        LocaleContext currentLocale = LocaleContextHolder.getLocaleContext();
        executeInChildThread(() -> {
                LocaleContextHolder.setLocaleContext(currentLocale);
                //Do whatever else you wanna do
            }));

        //Continue stuff you want to do with parent thread
}


private void executeInChildThread(Runnable runnable) {
    try {
        CompletableFuture.runAsync(runnable)
            .get();
    } catch (Exception e) {
        LOGGER.error("something is wrong");
    }
}

Ответ 5

Мне не нравится подход Reflection. Альтернативным решением было бы реализовать оболочку исполнителя и передать объект непосредственно в контексте ThreadLocal ко всем дочерним потокам, распространяющим родительский контекст.

public class PropagatedObject {

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();

   //put, set, merge methods, etc

}

== >

public class ObjectAwareExecutor extends AbstractExecutorService {

    private final ExecutorService delegate;
    private final PropagatedObject objectAbsorber;

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){
        this.delegate = delegate;
        this.objectAbsorber = objectAbsorber;
    }
    @Override
    public void execute(final Runnable command) {

        final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
        delegate.execute(() -> {
            try{
                objectAbsorber.set(parentContext);
                command.run();
            }finally {
                parentContext.putAll(objectAbsorber.get());
                objectAbsorber.clean();
            }
        });
        objectAbsorber.merge(parentContext);
    }

Ответ 6

Если вы посмотрите код ThreadLocal, вы можете увидеть:

    public T get() {
        Thread t = Thread.currentThread();
        ...
    }

текущий поток не может быть перезаписан.

Возможные решения:

  • Посмотрите на механизм java 7 fork/join (но я думаю, что это плохой способ)

  • Посмотрите одобренный механизм, чтобы перезаписать класс ThreadLocal в вашей JVM.

  • Попробуйте переписать RESTEasy (вы можете использовать инструменты Refactor в своей среде IDE для замены всего использования ThreadLocal, это выглядит легко)