버튼 수집상

[Kotlin] RxJava의 mergeDelayError를 Coroutine Flow로 만들기 Coroutine Flow merge 본문

TIL - Kotlin

[Kotlin] RxJava의 mergeDelayError를 Coroutine Flow로 만들기 Coroutine Flow merge

cocokaribou 2024. 1. 9. 13:48

예전에 stackoverflow에 남겼던 질문을 여기 지면에 정리

https://stackoverflow.com/questions/72925716/coroutine-equivalent-of-flowable-mergedelayerror

 

Coroutine equivalent of Flowable.mergeDelayError

I'm in the middle of migrating Rxjava based project to Coroutine Flow. I need to merge multiple api calls and if one of them fails, I still want it to proceed and do the job. Here's the original co...

stackoverflow.com

 

RxJava의 mergeDelayError는 여러 suspend 함수를 머지해서 호출했을 때 함수 하나가 에러나도 나머지 함수를 마저 콜하고 에러를 딜레이시킨다.

 

Coroutine Flow로 위와 비슷하게 여러 suspend 함수들을 merge 해보겠다.

예시 코드는 여기 레포지토리를 참고하길 바란다.

api의 request/response 로그를 인앱 UI로 보여주는 앱이다.

 

 

레트로핏 인터페이스 ApiService.kt

val service by lazy { ApiService.service() }

interface ApiService {

    @GET("posts")
    suspend fun mockData1(): List<Data>
    @GET("failingApi")
    suspend fun failingData(): List<Data>
    @GET("albums")
    suspend fun mockData2(): List<Data>
    @GET("photos")
    suspend fun mockData3(): List<Data>
    @GET("todos")
    suspend fun mockData4(): List<Data>
    @GET("users")
    suspend fun mockData5(): List<Data>

    companion object {
        private val logging: HttpLoggingInterceptor = HttpLoggingInterceptor()
            .setLevel(
                if (BuildConfig.DEBUG) HttpLoggingInterceptor.Level.BASIC
                else HttpLoggingInterceptor.Level.NONE
            )

        private val interceptor = Interceptor { chain ->
            val original = chain.request()
            val requestBuilder: Request.Builder = original.newBuilder()

            val request: Request = requestBuilder.build()
            CustomLog.a(request) // request 로그뷰

            val response = chain.proceed(request)
            CustomLog.a(response) // response 로그뷰

            return@Interceptor response
        }

        private val dispatcher = Dispatcher().apply {
            maxRequests = 8
            maxRequestsPerHost = 8
        }

        fun service() : ApiService {
            val client = OkHttpClient.Builder()
                .addInterceptor(interceptor)
                .addInterceptor(logging)
                .dispatcher(dispatcher)
                .connectTimeout(2000, TimeUnit.SECONDS)
                .readTimeout(2000, TimeUnit.SECONDS)
                .build()

            return Retrofit.Builder()
                .baseUrl("https://jsonplaceholder.typicode.com/")
                .addConverterFactory(GsonConverterFactory.create())
                .client(client)
                .build()
                .create(ApiService::class.java)
        }
    }
}

 

레포지토리 MainRepository.kt

class MainRepository {
    suspend fun requestMockData1(): Flow<Result<List<Data>>> {
        return flow {
            val data = service.mockData1()
            emit(Result.success(data))
        }.retryWhen { cause, attempt ->
            return@retryWhen attempt < 2 && cause is java.lang.Exception
        }.catch {
            emit(Result.failure(Throwable("mockData1")))
        }.flowOn(Dispatchers.IO)
    }
//...    
}

뷰모델 MainViewModel.kt

class MainViewModel : ViewModel() {
    private val repository by lazy { MainRepository() }

    val onApiComplete = MutableLiveData<String>()
    fun requestApis() {
        viewModelScope.launch {
            with(repository) {
                merge(
                    requestMockData1(),
                    requestFailingData(),
                    requestMockData2(),
                    requestMockData3(),
                    requestMockData4(),
                    requestMockData5()
                )
                    .catch {}
                    .onEach {
                        it.fold(
                            onSuccess = {},
                            onFailure = {}
                        )
                    }
                    .onCompletion {
                        onApiComplete.postValue("merged api complete!")
                    }
                    .launchIn(viewModelScope)
            }
        }
    }
}

 

액티비티 MainActivity.kt

class MainActivity : AppCompatActivity() {

    private val viewModel by viewModels<MainViewModel>()
    private val binding by lazy { ActivityMainBinding.inflate(layoutInflater) }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        CustomLog.d("app start!")

        viewModel.requestApis()
        CustomLog.d("debug log!")
        CustomLog.w("warning log!")
        CustomLog.i("info log!")
        setContentView(binding.root)

        initUI()
        initObserve()

    }

    private fun initUI() = with(binding) {
        logview.initBottomTab()

        btn.setOnClickListener {
            logview.toggleLogView()
        }
        swipe.setOnRefreshListener {
            CustomLog.d("refresh api!")
            viewModel.requestApis()
        }
    }

    private fun initObserve() {
        CustomLog.logLiveData.observe(this) {
            binding.logview.submitList(it)
        }
        viewModel.onApiComplete.observe(this) { msg ->
            binding.swipe.isRefreshing = false
            CustomLog.d(msg)
        }
    }
}

 

그러면 api 호출함수를 merge로 불렀을 때 아래처럼 불리는 것을 볼 수 있다.

failingApi 가 404로 에러로 떨어져도 다른 api를 마저 호출하는 것을 볼 수 있다.

에러가 맨 뒷순서로 딜레이 되는 것은 아니다.

 

728x90