用 ListenableWorker 处理线程

在某些情况下,您可能需要提供自定义线程处理策略。例如,您可能需要处理基于回调的异步操作。在这种情况下,不能只依靠 Worker 来完成操作,因为它无法以阻塞方式完成这项任务。WorkManager 通过 ListenableWorker 支持该用例。ListenableWorker 是最低层级的工作器 API;WorkerCoroutineWorkerRxWorker 都是从这个类衍生而来的。ListenableWorker 只会发出信号以表明应该开始和停止工作,而线程处理则完全交由您负责完成。开始工作信号在主线程上调用,因此请务必手动转到您选择的后台线程。

抽象方法 ListenableWorker.startWork() 会返回一个将使用操作的 Result 设置的 ListenableFutureListenableFuture 是一个轻量级接口:它是一个 Future,用于提供附加监听器和传播异常的功能。在 startWork 方法中,应该返回 ListenableFuture,完成操作后,您需要使用操作的 Result 设置这个返回结果。您可以通过以下两种方式创建 ListenableFuture

  1. 如果您使用的是 Guava,请使用 ListeningExecutorService
  2. 否则,请将 councurrent-futures 包含到您的 gradle 文件并使用 CallbackToFutureAdapter

如果您希望基于异步回调执行某些工作,可以执行如下操作:

    public class CallbackWorker extends ListenableWorker {

        public CallbackWorker(Context context, WorkerParameters params) {
            super(context, params);
        }

        @NonNull
        @Override
        public ListenableFuture<Result> startWork() {
            return CallbackToFutureAdapter.getFuture(completer -> {
                Callback callback = new Callback() {
                    int successes = 0;

                    @Override
                    public void onFailure(Call call, IOException e) {
                        completer.setException(e);
                    }

                    @Override
                    public void onResponse(Call call, Response response) {
                        ++successes;
                        if (successes == 100) {
                            completer.set(Result.success());
                        }
                    }
                };

                for (int i = 0; i < 100; ++i) {
                    downloadAsynchronously("https://www.google.com", callback);
                }
                return callback;
            });
        }
    }
    

如果您的工作停止会发生什么?如果预计工作会停止,则始终会取消 ListenableWorkerListenableFuture。通过使用 CallbackToFutureAdapter,您只需添加一个取消监听器即可,如下所示:

    public class CallbackWorker extends ListenableWorker {

        public CallbackWorker(Context context, WorkerParameters params) {
            super(context, params);
        }

        @NonNull
        @Override
        public ListenableFuture<Result> startWork() {
            return CallbackToFutureAdapter.getFuture(completer -> {
                Callback callback = new Callback() {
                    int successes = 0;

                    @Override
                    public void onFailure(Call call, IOException e) {
                        completer.setException(e);
                    }

                    @Override
                    public void onResponse(Call call, Response response) {
                        ++successes;
                        if (successes == 100) {
                            completer.set(Result.success());
                        }
                    }
                };

                completer.addCancellationListener(cancelDownloadsRunnable, executor);

                for (int i = 0; i < 100; ++i) {
                    downloadAsynchronously("https://www.google.com", callback);
                }
                return callback;
            });
        }
    }