audiotrack的Marker/PeriodPosition/flush流程(mode_stream)

简述

之前我们看到代码中有一些marker、period、flush等相关的,但是还没搞清楚它们的作用,这篇文章,我们一起来看下吧。

正文

代码中的一些变量的存在甚至线程的存在,它们都是有其存在的意义的,为了能够弄清楚这些变量,我们有必要来研究各个函数的实现。

markerPosition是只会触发一次回调的位置
periodposition是周期性的触发回调,这个位置值就是每次需要累加的周期值。

markerPosition/PeriodPosition

markerPosition只通知你一次,PeriodPosition是隔指定帧数通知。
但是这个通知是不太准确的,因为这个跟底层一次性消费的数量有一定关系的
对于PeriodPosition,如果你给100,但是底层一次获取300,那你会一次性收到三次的回调,而且每次回调去获取position都是300.
对于markerPosition,也是存在一样的情况,只是它只会通知一次。

用例代码

java层的代码

mAudioTrack.setNotificationMarkerPosition(500);
mAudioTrack.setPositionNotificationPeriod(200);
mAudioTrack.setPlaybackPositionUpdateListener(mPositionUpdate);
AudioTrack.OnPlaybackPositionUpdateListener mPositionUpdate = new AudioTrack.OnPlaybackPositionUpdateListener() {

    @Override
    public void onMarkerReached(AudioTrack track) {
        Log.d(TAG, "onMarkerReached " + track.getPlaybackHeadPosition());
    }

    @Override
    public void onPeriodicNotification(AudioTrack track) {
        Log.d(TAG, "onPeriodicNotification " + track.getPlaybackHeadPosition());
    }
};

上面是用例代码,就是创建监听器,然后注册到audiotrack中。

AudioTrackThread相关部分分析

要看这两个Position的实现之前,我们先来看下AudioTrackThread的部分代码。
前面我们在看audiotrack的创建、play的流程中就有看到这个Thread是会被跑起来了。
play中是通过调用resume来让Thread跑起来的

void AudioTrack::AudioTrackThread::resume()
{
    AutoMutex _l(mMyLock);
    // 先确定要绕过内部pause
    mIgnoreNextPausedInt = true;
    if (mPaused || mPausedInt) {
        // 取消pause状态
        mPaused = false;
        mPausedInt = false;
        // 唤醒等待的线程
        mMyCond.signal();
    }
}

上面简单的完成了唤醒线程的操作,但是前提必须是线程的状态处于pause或者是内部的pause状态
上面的几个变量我们在线程的threadLoop中再讲解:

bool AudioTrack::AudioTrackThread::threadLoop()
{
    {
        AutoMutex _l(mMyLock);
        // 线程刚创建的时候,mPaused是为true的,所以会阻塞在这,等resume的时候,会被设置为false,
        // 并且唤醒wait。mPaused这个变量表示的是外部pause,就是通过api来控制的
        if (mPaused) {
            mMyCond.wait(mMyLock);
            // 直接返回了。调用该函数的地方会去检测exitPending()
            return true;
        }
        // mIgnoreNextPausedInt这个变量存在的意义是为了绕过内部的pause
        if (mIgnoreNextPausedInt) {
            mIgnoreNextPausedInt = false;
            mPausedInt = false;
        }
        // mPausedInt这个变量相对于上面的mPaused,它是线程内部的控制等待还是继续的。
        // 因为某些场景我们是不需要该线程一直在轮训的。
        if (mPausedInt) {
            if (mPausedNs > 0) {
                (void) mMyCond.waitRelative(mMyLock, mPausedNs);
            } else {
                mMyCond.wait(mMyLock);
            }
            mPausedInt = false;
            // 唤醒之后,也是立即返回
            return true;
        }
    }
    if (exitPending()) {
        return false;
    }
    // 这个函数后面详解
    nsecs_t ns = mReceiver.processAudioBuffer();
    switch (ns) {
    case 0:
        return true;
    // 如果检测到当前track不处在ACTIVE,则进入内部pause,pauseInternal函数只是设置变量和时间
    // 会在开头的地方发生waiting
    case NS_INACTIVE:
        pauseInternal();
        return true;
    // 这个线程就算over了。
    case NS_NEVER:
        return false;
    // 时间上没有强制,那就随便给一个最大的,然后进入内部pause了。
    case NS_WHENEVER:
        // Event driven: call wake() when callback notifications conditions change.
        ns = INT64_MAX;
        // fall through
    default:
        pauseInternal(ns);
        return true;
    }
}

从这个函数中,我们知道了循环的逻辑:

  1. 当前是否pause,是的话waiting,被唤醒之后直接返回,进入下一次循环
  2. 获取数据,同时返回一个时间
  3. 根据时间信息来确定是退出线程还是内部pause,或者直接进行下一次的循环

pause有分内部pause和外部pause,内部pause的话,主要是根据目前内部状态来确定是否需要进行pause,而外部pause的话,则是别人调用audiotrack的api来触发的。

接下来我们来看函数processAudioBuffer,我们去掉了很多跟这次分析无关的部分

nsecs_t AudioTrack::processAudioBuffer()
{
    if (mCblk == NULL) {
        return NS_NEVER;
    }

    mLock.lock();

    // Can only reference mCblk while locked
    int32_t flags = android_atomic_and(
        ~(CBLK_UNDERRUN | CBLK_LOOP_CYCLE | CBLK_LOOP_FINAL | CBLK_BUFFER_END), &mCblk->mFlags);

    if (flags & CBLK_INVALID) {
        // 对于offloaded tracks函数restoreTrack_l只是更新sequence和请缓存。这边我们不做退出,而是
        // 通知上层去recreate这个track
        if (!isOffloadedOrDirect_l() || (mSequence == mObservedSequence)) {
            status_t status __unused = restoreTrack_l("processAudioBuffer");
        }
    }
    // waitStreamEnd这个变量的存在就是为STATE_STOPPING存在的
    bool waitStreamEnd = mState == STATE_STOPPING;
    // 当前是否处于ACTIVE状态。
    bool active = mState == STATE_ACTIVE;

    // Manage underrun callback, must be done under lock to avoid race with releaseBuffer()
    bool newUnderrun = false;
    if (flags & CBLK_UNDERRUN) {
        if (!mInUnderrun) {
            mInUnderrun = true;
            newUnderrun = true;
        }
    }

    // 获取当前的server的位置
    size_t position = updateAndGetPosition_l();

    // marker callback的管理
    bool markerReached = false;
    size_t markerPosition = mMarkerPosition;
    // 从这边就可以看出marker只会触发一次回调,一旦触发mMarkerReached将被设置为true。
    if (!mMarkerReached && (markerPosition > 0) && (position >= markerPosition)) {
        mMarkerReached = markerReached = true;
    }

    // 计算需要触发多少次的callback
    size_t newPosCount = 0;
    size_t newPosition = mNewPosition;
    size_t updatePeriod = mUpdatePeriod;
    if (updatePeriod > 0 && position >= newPosition) {
        newPosCount = ((position - newPosition) / updatePeriod) + 1;
        mNewPosition += updatePeriod * newPosCount;
    }

    uint32_t sampleRate = mSampleRate;
    float speed = mPlaybackRate.mSpeed;
    const uint32_t notificationFrames = mNotificationFramesAct;
    if (mRefreshRemaining) {
        mRefreshRemaining = false;
        mRemainingFrames = notificationFrames;
        mRetryOnPartialBuffer = false;
    }
    size_t misalignment = mProxy->getMisalignment();
    uint32_t sequence = mSequence;
    sp<AudioTrackClientProxy> proxy = mProxy;

    mLock.unlock();

    // get anchor time to account for callbacks.
    const nsecs_t timeBeforeCallbacks = systemTime();

    // 开始触发各种类型的回调了
    if (newUnderrun) {
        mCbf(EVENT_UNDERRUN, mUserData, NULL);
    }
    while (loopCountNotifications > 0) {
        mCbf(EVENT_LOOP_END, mUserData, NULL);
        --loopCountNotifications;
    }
    if (flags & CBLK_BUFFER_END) {
        mCbf(EVENT_BUFFER_END, mUserData, NULL);
    }
    // 我们关注的~
    if (markerReached) {
        mCbf(EVENT_MARKER, mUserData, &markerPosition);
    }
    // 我们关注的~
    while (newPosCount > 0) {
        size_t temp = newPosition;
        mCbf(EVENT_NEW_POS, mUserData, &temp);
        newPosition += updatePeriod;
        newPosCount--;
    }
    if (mObservedSequence != sequence) {
        mObservedSequence = sequence;
        mCbf(EVENT_NEW_IAUDIOTRACK, mUserData, NULL);
        // for offloaded tracks, 我们需要等待上层来创建
        if (isOffloadedOrDirect()) {
            return NS_INACTIVE;
        }
    }

    // 如果当前state不为active,需要等待到别人再触发该线程唤醒
    if (!active) {
        return NS_INACTIVE;
    }

    // 通过帧的位置来计算需要休眠的时间长度。从下面的计算中,我们就可以知道,这个也是个估值。
    uint32_t minFrames = ~0;
    if (!markerReached && position < markerPosition) {
        minFrames = markerPosition - position;
    }
    if (loopPeriod > 0 && loopPeriod < minFrames) {
        // loopPeriod is already adjusted for actual position.
        minFrames = loopPeriod;
    }
    if (updatePeriod > 0) {
        minFrames = min(minFrames, uint32_t(newPosition - position));
    }

    // If > 0, poll periodically to recover from a stuck server.  A good value is 2.
    static const uint32_t kPoll = 0;
    if (kPoll > 0 && mTransfer == TRANSFER_CALLBACK && kPoll * notificationFrames < minFrames) {
        minFrames = kPoll * notificationFrames;
    }

    // This "fudge factor" avoids soaking CPU, and compensates for late progress by server
    static const nsecs_t kWaitPeriodNs = WAIT_PERIOD_MS * 1000000LL;
    const nsecs_t timeAfterCallbacks = systemTime();

    // Convert frame units to time units
    nsecs_t ns = NS_WHENEVER;
    // 如果minFrames没被修改过的话,那ns就为NS_WHENEVER,如果被修改过就发生改变
    if (minFrames != (uint32_t) ~0) {
        ns = framesToNanoseconds(minFrames, sampleRate, speed) + kWaitPeriodNs;
        ns -= (timeAfterCallbacks - timeBeforeCallbacks);  // account for callback time
        if (ns < 0) ns = 0;
    }

    // If not supplying data by EVENT_MORE_DATA, then we're done
    // 如果不是采用TRANSFER_CALLBACK,就在这边返回了。对于我们java下面的stream的track就到这边了。
    if (mTransfer != TRANSFER_CALLBACK) {
        return ns;
    }
    ...
}

对于该函数我们需要关注的:

  1. 更新了server position之后,就开始进行markerposition和periodposition的判断和计算
  2. 然后触发回调
  3. 计算下次触发markerposition或者periodposition所需要的时间
  4. 如果transfer不为callback类型,我们就直接返回这个时间。那在这个时间段内,线程就可以安心的去休息了

接下来我们来看setPositionUpdatePeriodsetMarkerPosition

status_t AudioTrack::setPositionUpdatePeriod(uint32_t updatePeriod)
{
    // 不可用于offload和direct
    if (mCbf == NULL || isOffloadedOrDirect()) {
        return INVALID_OPERATION;
    }

    AutoMutex lock(mLock);
    // 计算下一次的位置和更新变量mUpdatePeriod
    mNewPosition = updateAndGetPosition_l() + updatePeriod;
    mUpdatePeriod = updatePeriod;

    sp<AudioTrackThread> t = mAudioTrackThread;
    if (t != 0) {
        // 如果线程在休眠,那我们就唤醒它。让它也赶紧更新确认下。
        t->wake();
    }
    return NO_ERROR;
}

status_t AudioTrack::setMarkerPosition(uint32_t marker)
{
    if (mCbf == NULL || isOffloadedOrDirect()) {
        return INVALID_OPERATION;
    }

    AutoMutex lock(mLock);
    // 更新marker的位置和mMarkerReached设置为false
    mMarkerPosition = marker;
    mMarkerReached = false;

    sp<AudioTrackThread> t = mAudioTrackThread;
    if (t != 0) {
        // 如果线程在休眠,那我们就唤醒它。让它也赶紧更新确认下。
        t->wake();
    }
    return NO_ERROR;
}

上面两函数的逻辑基本一致,只是重置下相关变量,然后唤醒线程。
我们来看下wake的实现

void AudioTrack::AudioTrackThread::wake()
{
    AutoMutex _l(mMyLock);
    // 当前必须不能为pause
    if (!mPaused) {
        mIgnoreNextPausedInt = true;
        // 必须是处在内部pause的场景,并且时间是大于0的
        if (mPausedInt && mPausedNs > 0) {
            // 更新内部pause状态,然后唤醒线程
            mPausedInt = false;
            mMyCond.signal();
        }
    }
}

wake触发生效的前提是,当前Thread不能为pause,而是处于内部pause并且必须保证休眠的时间大于0,这个时候才会去唤醒Thread。

我们这边在总结下AudioTrackThread的逻辑:

在尚未设置markerPosition和periodposition之前,AudioTrackThread一直处在running的状态,只是休眠的时间比较长,当设置了markerPosition或者periodposition之后,线程就会被从休眠中唤醒,然后去计算下一次唤醒的时间,一旦时间到了,就会触发回调。

这个就是在transfer不为callback场景下,使用到AudioTrackThread的地方了。

flush

flush的目的是将已经写到mcblk buffer中的数据进行清空,其实就是重置下front到rear。

但是实现上还是有一些搞不是很清楚。

可以直接从native这边开始看:

void AudioTrack::flush()
{
    // 不适用与share memory
    if (mSharedBuffer != 0) {
        return;
    }
    AutoMutex lock(mLock);
    // 该track不能是STATE_ACTIVE或者STATE_FLUSHED。其实就是要PAUSE或者STOP的场景
    if (mState == STATE_ACTIVE || mState == STATE_FLUSHED) {
        return;
    }
    flush_l();
}

这个函数就是过滤一些状态,从这边看出来,flush生效必须state不为active和flush。

void AudioTrack::flush_l()
{
    ALOG_ASSERT(mState != STATE_ACTIVE);

    // clear playback marker and periodic update counter
    // 将marker和periodposition都设置为清空
    mMarkerPosition = 0;
    mMarkerReached = false;
    mUpdatePeriod = 0;
    mRefreshRemaining = true;
    // 状态更新为STATE_FLUSHED
    mState = STATE_FLUSHED;
    mReleased = 0;
    // 如果是offload的话,去中断obtainBuffer的操作
    if (isOffloaded_l()) {
        mProxy->interrupt();
    }
    // 代理去flush,IAudioTrack去flush
    mProxy->flush();
    mAudioTrack->flush();
}
  1. 会对marker和periodposition进行重置,这个需要注意下,目前也不确定其用意。这样上层得重新再设置值。
  2. 重置下state为flush
  3. 调用代理的flush和IAudioTrack的flush

    void AudioTrackClientProxy::flush()
    {

    // This works for mFrameCountP2 <= 2^30
    // 两倍的P2帧大小
    size_t increment = mFrameCountP2 << 1;
    size_t mask = increment - 1;
    audio_track_cblk_t* cblk = mCblk;
    // mFlush is 32 bits concatenated as [ flush_counter ] [ newfront_offset ]
    // Should newFlush = cblk->u.mStreaming.mRear?  Only problem is
    // if you want to flush twice to the same rear location after a 32 bit wrap.
    int32_t newFlush = (cblk->u.mStreaming.mRear & mask) |
                        ((cblk->u.mStreaming.mFlush & ~mask) + increment);
    android_atomic_release_store(newFlush, &cblk->u.mStreaming.mFlush);
    

    }

上面是clientProxy的flush函数,主要是为了拼接出flush的值。
逻辑:

  1. P2的两倍 P2是2的整次幂,如2^3 = 8 左移就是 2^4 = 16
  2. mask两倍p2的mask, 如果P2为2^3 那这个值为0111
  3. flush的拼接是 rear & 0111 | ((flush & 1000) + 2^4) ,不是很明白为什么这么处理,为什么不直接拿mRear来给flush就好了呢?

下面是IAudioTrack端实现的flush

void AudioFlinger::PlaybackThread::Track::flush()
{
    ALOGV("flush(%d)", mName);
    sp<ThreadBase> thread = mThread.promote();
    if (thread != 0) {
        Mutex::Autolock _l(thread->mLock);
        PlaybackThread *playbackThread = (PlaybackThread *)thread.get();

        if (isOffloaded()) {
            ...
        } else {
            // 对状态再做一次过滤
            if (mState != STOPPING_1 && mState != STOPPING_2 && mState != STOPPED &&
                    mState != PAUSED && mState != PAUSING && mState != IDLE && mState != FLUSHED) {
                return;
            }
            // 做状态切换
            mState = FLUSHED;
            // 如果当前track正在stop或者pause,则不要进行reset。如果是stop的场景,reset将会在
            // prepareTracks_l中被调用
            if (isDirect()) {
                mFlushHwPending = true;
            }
            // 如果已经被移除active list,则进行reset操作。reset函数我们在之前有讨论过
            // pause之后调用这个flush,会触发reset的操作。
            if (playbackThread->mActiveTracks.indexOf(this) < 0) {
                reset();
            }
        }
        // 避免flush丢失 如果某个track被flush之后在mixer thread运行之前它就resume了。
        // 这个对于offload非常重要,因为硬件buffer可能持有大量的音频数据
        playbackThread->broadcast_l();
    }
}

上面函数主要是做了:

  1. 在对状态做一遍过滤
  2. 设置状态为flush
  3. 如果track已经不属于active track,那就将该track进行reset
  4. 发送广播

对于flush,我们想要看到效果的话,就得等继续播放才会体现出来,所以我们来看下serverproxy获取数据的代码:

status_t ServerProxy::obtainBuffer(Buffer* buffer, bool ackFlush)
{
    {
    audio_track_cblk_t* cblk = mCblk;
    int32_t front;
    int32_t rear;
    if (mIsOut) {
        int32_t flush = cblk->u.mStreaming.mFlush;
        rear = android_atomic_acquire_load(&cblk->u.mStreaming.mRear);
        front = cblk->u.mStreaming.mFront;
        if (flush != mFlush) {
            // effectively obtain then release whatever is in the buffer
            const size_t overflowBit = mFrameCountP2 << 1;
            const size_t mask = overflowBit - 1;
            // newFront 是取front的头+flush的尾
            // flush    是取flush的头+rear的尾
            // 所以正常情况下,rear和newFront是相等的。
            int32_t newFront = (front & ~mask) | (flush & mask);
            ssize_t filled = rear - newFront;
            if (filled >= (ssize_t)overflowBit) {
                // front and rear offsets span the overflow bit of the p2 mask
                // so rebasing newFront on the front offset is off by the overflow bit.
                // adjust newFront to match rear offset.
                newFront += overflowBit;
                filled -= overflowBit;
            }
            // Rather than shutting down on a corrupt flush, just treat it as a full flush
            if (!(0 <= filled && (size_t) filled <= mFrameCount)) {
                newFront = rear;
            }
            mFlush = flush;
            // 将front进行重新赋值
            android_atomic_release_store(newFront, &cblk->u.mStreaming.mFront);
            // There is no danger from a false positive, so err on the side of caution
            // 马上去唤醒客户端填充数据
            if (true /*front != newFront*/) {
                int32_t old = android_atomic_or(CBLK_FUTEX_WAKE, &cblk->mFutex);
                if (!(old & CBLK_FUTEX_WAKE)) {
                    (void) syscall(__NR_futex, &cblk->mFutex,
                            mClientInServer ? FUTEX_WAKE_PRIVATE : FUTEX_WAKE, 1);
                }
            }
            front = newFront;
        }
    } else {
        front = android_atomic_acquire_load(&cblk->u.mStreaming.mFront);
        rear = cblk->u.mStreaming.mRear;
    }
    ...
    return NO_INIT;
}

从上面代码看出来,就是重新获取到newFront,然后将其重新赋值给mFront。
然后唤醒client去填充数据。
从上面的几个对于filled空间大小的判断,感觉newFront并不一定总是等于mRear。。
这个部分目前还没能完全理清。。

flush的使用场景:
由于我们stop的过程会把mcblk buffer中的数据全都播放完,如果想直接当场结束的话,那可以采用先pause然后flush来取代stop的操作。