浅谈Android中的Handler


Handler.post(runnable) vs Handler.sendMessage(Message msg)

    public final boolean post(@NonNull Runnable r) {
       return  sendMessageDelayed(getPostMessage(r), 0);
    }

    private static Message getPostMessage(Runnable r) {
        Message m = Message.obtain();
        // Message的callback即是runnable
        m.callback = r;
        return m;
    }    

    public final boolean sendMessage(@NonNull Message msg) {
        return sendMessageDelayed(msg, 0);
    }

可以看到其实两种方式最终都是利用的sendMessageDelayed方法.区别是传递的Message对象.

  • Handler.post(Runnbale runnable)

    它的Message中callback为runnable

  • Handler.sendMessage(Message msg)

    一般传递给它的message并不会指定callback,也就是说callback为null

那么Message是否带callback有何影响?这里要说一下Message最终分发的逻辑:

   // 优先处理Message中自带的callback,其次就是Handler中的callback,再才是handleMessage方法 
   public void dispatchMessage(@NonNull Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }

    private static void handleCallback(Message message) {
        message.callback.run();
    }

    // 设置给Handler的Callback
    public interface Callback {
        // 返回true,则处理完毕;返回false,则后面handleMessage()会继续处理
        boolean handleMessage(@NonNull Message msg);
    }

    // Handler中需要重写的方法
    public void handleMessage(@NonNull Message msg) {
    }    

也就是一个执行优先级的区别了.

Handler与线程

ActivityThread中已经为主线程准备好了Looper,即已经创建了Looper,并开启了循环:

Looper.prepareMainLooper();

ActivityThread thread = new ActivityThread();
thread.attach(false, startSeq);

Looper.loop();

这就相当于为主线程暴露了一个消息传递的入口,在其他线程里可以创建一个主线程Handler,向主线程中传递消息,或者切换到主线程执行任务.

既然主线程可以这么做,其他线程也可以这么做,如果需要为线程开启消息循环机制,需要两步:

Looper.prepare();
Looper.loop();

对于主线程之外的线程的消息传递机制可以参考一下HandlerThread的实现方式,也可以直接使用HandlerThread:

    @Override
    public void run() {
        mTid = Process.myTid();
        Looper.prepare();
        synchronized (this) {
            mLooper = Looper.myLooper();
            notifyAll();
        }
        Process.setThreadPriority(mPriority);
        onLooperPrepared();
        Looper.loop();
        mTid = -1;
    }

子线程可以被销毁,但是主线程在APP生命周期中一直存在着,因此主线程的Looper其实是不允许停止的:

    public static void prepareMainLooper() {
        // 这里传递的false参数,不允许停止
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();
        }
    }

Message 与 MessageQueue

Message本身的对象池复用不必多少,需要说的是Message对象实际上是一个单链表,Message中的next字段就是下一个Message节点.

我们都知道LinkedList可以当作一个队列使用,这里Message其实就是一个链表,自然也可以当作一个队列使用.

而MessageQueue则封装了Message的入队出队操作,还需要注意的是这并不是完全意义上的先进先出队列,因为Message中存在延时时间,所以入队的时候需要根据执行时间插入到指定的节点.

既然大体上还算是一个队列,那么肯定有一个入队和出队的过程:

  • 入队:

        boolean enqueueMessage(Message msg, long when) {
            if (msg.target == null) {
                throw new IllegalArgumentException("Message must have a target.");
            }
    
            synchronized (this) {
                if (msg.isInUse()) {
                    throw new IllegalStateException(msg + " This message is already in use.");
                }
    
                if (mQuitting) {
                    IllegalStateException e = new IllegalStateException(
                            msg.target + " sending message to a Handler on a dead thread");
                    Log.w(TAG, e.getMessage(), e);
                    msg.recycle();
                    return false;
                }
    
                msg.markInUse();
                msg.when = when;
                Message p = mMessages;
                boolean needWake;
                if (p == null || when == 0 || when < p.when) {
                    // New head, wake up the event queue if blocked.
                    msg.next = p;
                    mMessages = msg;
                    needWake = mBlocked;
                } else {
                    needWake = mBlocked && p.target == null && msg.isAsynchronous();
                    Message prev;
                    for (;;) {
                        prev = p;
                        p = p.next;
                        if (p == null || when < p.when) {
                            break;
                        }
                        if (needWake && p.isAsynchronous()) {
                            needWake = false;
                        }
                    }
                    msg.next = p; // invariant: p == prev.next
                    prev.next = msg;
                }
    
                // We can assume mPtr != 0 because mQuitting is false.
                if (needWake) {
                    nativeWake(mPtr);
                }
            }
            return true;
        }
  • 出队:

    这里既然是一个消息循环,那么应该是循环开始的时候会不断从队列中取出Message,这里是Looper.loop():

        public static void loop() {
            final Looper me = myLooper();
            if (me == null) {
                throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
            }
            me.mInLoop = true;
            final MessageQueue queue = me.mQueue;
            Binder.clearCallingIdentity();
            final long ident = Binder.clearCallingIdentity();
    
            // 循环
            for (;;) {
                // 取出消息
                Message msg = queue.next(); // might block
                if (msg == null) {
                    // No message indicates that the message queue is quitting.
                    return;
                }
    
                long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
                try {
                    // 分发消息
                    msg.target.dispatchMessage(msg);
    
                } catch (Exception exception) {
    
                } finally {
    
                }
                msg.recycleUnchecked();
            }
        }

    这里其实用的MessageQueue的next方法取出消息的:

        Message next() {
    
            final long ptr = mPtr;
            if (ptr == 0) {
                return null;
            }
    
            int pendingIdleHandlerCount = -1; // -1 only during first iteration
            int nextPollTimeoutMillis = 0;
            for (;;) {
                if (nextPollTimeoutMillis != 0) {
                    Binder.flushPendingCommands();
                }
    
                nativePollOnce(ptr, nextPollTimeoutMillis);
    
                synchronized (this) {
                    // Try to retrieve the next message.  Return if found.
                    final long now = SystemClock.uptimeMillis();
                    Message prevMsg = null;
                    Message msg = mMessages;
                    if (msg != null && msg.target == null) {
                        // Stalled by a barrier.  Find the next asynchronous message in the queue.
                        do {
                            prevMsg = msg;
                            msg = msg.next;
                        } while (msg != null && !msg.isAsynchronous());
                    }
                    if (msg != null) {
                        if (now < msg.when) {
                            // Next message is not ready.  Set a timeout to wake up when it is ready.
                            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                        } else {
                            // Got a message.
                            mBlocked = false;
                            if (prevMsg != null) {
                                prevMsg.next = msg.next;
                            } else {
                                mMessages = msg.next;
                            }
                            msg.next = null;
                            if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                            msg.markInUse();
                            return msg;
                        }
                    } else {
                        // No more messages.
                        nextPollTimeoutMillis = -1;
                    }
    
                    // Process the quit message now that all pending messages have been handled.
                    if (mQuitting) {
                        dispose();
                        return null;
                    }
    
                    // If first time idle, then get the number of idlers to run.
                    // Idle handles only run if the queue is empty or if the first message
                    // in the queue (possibly a barrier) is due to be handled in the future.
                    if (pendingIdleHandlerCount < 0
                            && (mMessages == null || now < mMessages.when)) {
                        pendingIdleHandlerCount = mIdleHandlers.size();
                    }
                    if (pendingIdleHandlerCount <= 0) {
                        // No idle handlers to run.  Loop and wait some more.
                        mBlocked = true;
                        continue;
                    }
    
                    if (mPendingIdleHandlers == null) {
                        mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                    }
                    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
                }
    
                // Run the idle handlers.
                // We only ever reach this code block during the first iteration.
                for (int i = 0; i < pendingIdleHandlerCount; i++) {
                    final IdleHandler idler = mPendingIdleHandlers[i];
                    mPendingIdleHandlers[i] = null; // release the reference to the handler
    
                    boolean keep = false;
                    try {
                        keep = idler.queueIdle();
                    } catch (Throwable t) {
                        Log.wtf(TAG, "IdleHandler threw exception", t);
                    }
    
                    if (!keep) {
                        synchronized (this) {
                            mIdleHandlers.remove(idler);
                        }
                    }
                }
    
                // Reset the idle handler count to 0 so we do not run them again.
                pendingIdleHandlerCount = 0;
    
                // While calling an idle handler, a new message could have been delivered
                // so go back and look again for a pending message without waiting.
                nextPollTimeoutMillis = 0;
            }
        }

    其实也是一个根据时间查找节点的过程.

再补充一点,MessageQueue实际上是在Looper实例化的时候创建的,并且Looper持有了MessageQueue的引用,Handler中使用的就是Looper传递过去的MessageQueue.

开发者可以使用Looper.myQueue()获取对应的MessageQueue对象.

IdleHandler

    // 消息队列为空时的回调接口
    public static interface IdleHandler {
        // 当前MessageQueue为空,即阻塞时,执行这个回调方法
        // 返回true,则会一直保持这个IdleHandler,会重复执行,返回fasle则只执行一次
        boolean queueIdle();
    }

IdleHandler时MessageQueue中的一个内部接口.当消息队列阻塞时,即消息为空,或当前没有要执行的消息的时候就会执行这个IdleHandler.

一般添加IdleHandler都是使用:

Looper.myQueue().addIdleHandler {
           doSomething()
           true
}

一般用在性能优化场景和一些不抢占主线程任务资源但是也有机会执行的场景:

  • ActivityThread中的GC操作
  • LeakCanary中内存泄漏提示Toast的显示
  • Android的严格模式
  • 各种Instrumentation

同步消息/异步消息/SyncBarrier(同步屏障)

    public void setAsynchronous(boolean async) {
        if (async) {
            flags |= FLAG_ASYNCHRONOUS;
        } else {
            flags &= ~FLAG_ASYNCHRONOUS;
        }
    }

Message 中可以通过调用setAsynchronous()方法来设置是否为异步消息.实际使用的时候,需要使用MessageCompat.

同步消息和异步消息的发送其实都是通过sendMessageAtTime发送的,这之后都会执行到:

    private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
            long uptimeMillis) {
        // 这里赋值了target为当前Handler   
        msg.target = this;
        msg.workSourceUid = ThreadLocalWorkSource.getUid();

        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }

而SyncBarrier则是在MessageQueue中通过postSyncBarrier(long when)添加的:

    private int postSyncBarrier(long when) {
        // Enqueue a new sync barrier token.
        // We don't need to wake the queue because the purpose of a barrier is to stall it.
        synchronized (this) {
            final int token = mNextBarrierToken++;
            final Message msg = Message.obtain();
            msg.markInUse();
            msg.when = when;
            msg.arg1 = token;

            Message prev = null;
            Message p = mMessages;
            if (when != 0) {
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                msg.next = p;
                mMessages = msg;
            }
            return token;
        }
    }

可以看到,这里的Message实际上没有给target赋值,因此target = null.也就是说SyncBarrier是target = null的Message.

当loop的时候如果遇到SyncBarrier,则会去寻找下一个异步消息,因此这里SyncBarrier的作用是拦截同步消息,通过异步消息.

在MessageQueue的next方法中有这么一段就是处理这部分逻辑的:

// Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }

SyncBarrier一般都是成对使用的:

  • private int postSyncBarrier(long when)
  • public void removeSyncBarrier(int token)

不过目前这两个方法在应用层都无法使用.具体的使用场景可以参考ViewRootImpl:

    void scheduleTraversals() {
        if (!mTraversalScheduled) {
            mTraversalScheduled = true;
            mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
            mChoreographer.postCallback(
                    Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
            notifyRendererOfFramePending();
            pokeDrawLockIfNeeded();
        }
    }

    void unscheduleTraversals() {
        if (mTraversalScheduled) {
            mTraversalScheduled = false;
            mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
            mChoreographer.removeCallbacks(
                    Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
        }
    }

    void doTraversal() {
        if (mTraversalScheduled) {
            mTraversalScheduled = false;
            mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);

            performTraversals();

            if (mProfile) {
                Debug.stopMethodTracing();
                mProfile = false;
            }
        }
    }

内存泄漏问题

Handler的内存泄漏问题是一个老生常谈的问题了,这里就不多说了,一般的场景有:

  • 内部类/匿名内部类

    内部类/匿名内部类会隐私持有外部对象,容易造成内存泄漏,因此需要使用静态内部类

  • Callback问题

    在Activity销毁或者任务执行完之后需要removeCallback,不然也有可能会导致内存泄漏

为什么主线程不会因为Looper.loop()而阻塞?

前面的loop循环有说过,里面有一个无限循环,但是为什么不会因此阻塞呢,导致无法处理其他的主线程任务.

这里需要明确一点:

  • 主线程任务都会通过Message消息传递机制执行,也就是说最终都会在loop()中获取到并执行

    Android Activity/Service的各种回调,比如onCreate()/onResume()等都是ActivityThread中H中有对应的消息类型进行处理,其实都是在消息队列中,并不会有什么其他类型的主线程任务会有别的方式去执行.

但是loop()中的无限循环,会很耗费资源吗?当没有任务时,是简单的sleep吗,那是如何唤醒的呢?

关键之处就在MessageQueue中next方法中的一个native方法调用:

nativePollOnce(ptr, nextPollTimeoutMillis);

android_os_MessageQueue.cpp中有:

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    // 这里的Looper是在Looper.h中定义的
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

可以看到这里其实用到的是Native层的LooperpoolOnce方法.

Native层的Looper支持监控文件描述符事件,还支持callback,并且其内部使用的是epoll()方式.

Looper.cpp中有:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;

                if (outFd != nullptr) *outFd = fd;
                if (outEvents != nullptr) *outEvents = events;
                if (outData != nullptr) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
            if (outFd != nullptr) *outFd = 0;
            if (outEvents != nullptr) *outEvents = 0;
            if (outData != nullptr) *outData = nullptr;
            return result;
        }

        // 这里
        result = pollInner(timeoutMillis);
    }
}

int Looper::pollInner(int timeoutMillis) {
    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
    }

    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    // epoll_wait()系统调用
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;

    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                        this, handler.get(), message.what);
#endif
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                    this, response.request.callback.get(), fd, events, data);
#endif
            // Invoke the callback.  Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}

关于epoll机制,可以查看https://zh.wikipedia.org/wiki/Epoll


文章作者: 姜康
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 姜康 !
评论
  目录