برنامه نویسی Reactive (واکنش پذیر) با RxAndroid در کاتلین

برنامه نویسی Reactive (واکنش پذیر) با RxAndroid در کاتلین
فهرست مقاله [نمایش]

    برنامه نویسی Reactive فقط یک API نیست. این یک الگوی کاملاً جدید برنامه نویسی است که مربوط به stream داده ها و گسترش تغییرات است.

    RxJava یک پیاده سازی Reactive برای آوردن این مفهوم به سیستم عامل اندروید است. برنامه های اندرویدی مکان مناسبی برای شروع کاوش در دنیای واکنش پذیر هستند. RxAndroid یک کتابخانه که رویدادهای ناهمگام UI را بسته بندی می کند تا به RxJava بیشتر شبیه باشد.

    در این آموزش برای برنامه نویسی واکنشی(Reactive) ، نحوه انجام موارد زیر را یاد خواهید گرفت:

    • مفاهیم برنامه نویسی واکنش پذیر را درک میکنید.

    • Observable را تعریف میکنید.

    • رویدادهای ناهمزمان مانند event های دکمه ای و تغییرات context متن را به ساختارهای observable تبدیل کنید.

    • observable ها را تغییر شکل دهید و فیلتر کنید.

    • استفاده از thread  های Rx در اجرای کد.

    • چندین observable را در یک جریان ترکیب کنید.

    • تمام observable های خود را به سازه های Flowable تبدیل کنید.

    • از RxJava  برای افزودن ویژگی مورد علاقه به برنامه استفاده کنید.

    خب امیدوارم که مفاهیم برایتان ساده باشد.برای مثال میخواهیم اپلیکیشن یافتن پنیر را پیاده سازی کنیم.

     

       سورس را از  اینجا  دانلود کنید

     

     

    شروع کنیم

    شما هم در CheeseActivity.kt و هم در CheeseAdapter.kt کار خواهید کرد. کلاس CheeseActivity از BaseSearchActivity را مشتق میشود. کد های BaseSearchActivity را بخوانید و میبینید که ویژگی هایی برای شما پیاده سازی کرده است که در زیر آنها را بررسی میکنیم:

    • showProgress(): تابعی برای نمایش نوار پیشرفت.
    • hideProgress(): تابعی برای پنهان کردن نوار پیشرفت.
    • showResult(result: List): متدی برای نمایش لیستی از پنیرها.
    • cheeseSearchEngine: فیلدی است که نمونه از CheeseSearchEngine را برای ما میسازد. این یک متد جستجو است که وقتی می خواهید پنیرها را جستجو کنید آن را فراخوانی میکنید. این متد یک درخواست جستجوی متن را میگیرد و لیستی از پنیرها مطابق با جستجوی ما را برمی گرداند.

    پروژه را روی دستگاه Android یا شبیه ساز خود اجرا کنید. شما باید یک صفحه جستجو با خالی را ببینید:

     

    البته قرار نیست برای همیشه اینجوری بماند شما به زودی توابع واکنشی را به برنامه اضافه خواهید کرد. قبل از ایجاد اولین  observable  ابتدا کمی تئوری صحبت کنیم.

     

    برنامه نویسی Reactive چیست؟

    در برنامه نویسی imperative  یک عبارت یک بار ارزیابی می شود و مقدار به یک متغیر اختصاص می یابد:

    var x = 2
    var y = 3
    var z = x * y // z is 6
    
    x = 10
    // z is still 6
    

     

    از طرف دیگر ، برنامه نویسی reactive همه چیز برمیگردد به پاسخ تغییرات مقدار.

    شما احتمالاً بعضی مواقع برنامه نویسی reactive را انجام داده اید  حتی اگر در آن زمان متوجه آن نبودید.

    • تعریف  cell value در spreadsheet مشابه تعریف متغیرها در برنامه نویسی imperative است.
    • تعریف عبارات cell در spreadsheet مانند تعریف و کار با observable ها در برنامه نویسی reactive است.

    spreadsheet زیر را که مثال بالا را پیاده سازی می کند در نظر بگیرید:

    spreadsheet به سلول B1 با مقدار 2 ، سلول B2 با مقدار 3 و سلول سوم ، B3 ، با عبارتی اختصاص می دهد که مقدار B1 را در مقدار B2 ضرب می کند. هنگامی که مقدار هر یک از اجزای ارجاع شده در عبارت تغییر می کند ، تغییر مشاهده می شود و عبارت به صورت اتوماتیک در B3 ارزیابی می شود:

    به بیان ساده تر ، ایده برنامه نویسی reactive وجود اجزایی است که تصویر بزرگتری را تشکیل می دهندکه می توان مشاهده کرد. و از برنامه خود بخواهید که هر زمان که اتفاق افتاد تغییرات را گوش کند و آنها را مصرف کند.

    تفاوت بین RxJava و RxKotlin

    همانطور که احتمالاً می دانید ، به لطف سازگاری زبان کاتلین با جاوا امکان استفاده از کتابخانه های جاوا در پروژه های کاتلین وجود دارد. اگر چنین است ، پس چرا RxKotlin در وهله اول ایجاد شده است؟ RxKotlin یک پکیج کاتلین پیرامون RxJava است ، که همچنین بسیاری از توابع مفید برای برنامه نویسی  reactive را فراهم می کند.

    در این مقاله ، ما به استفاده از RxJava خواهیم پرداخت زیرا درک مفاهیم اصلی این روش بسیار مهم است. با این حال ، هر آنچه یاد خواهید گرفت برای RxKotlin نیز صدق می کند.

    توجه: به فایل build.gradle و به ویژه وابستگی های پروژه نگاهی بیندازید. به جز کتابخانه های UI ، این مجموعه شامل بسته های RxKotlin و RxAndroid است. نیازی نیست که RxJava رادر اینجا مشخص کنیم چون که RxKotlin آن را دارد.

     

    RxJava Observable Contract

    RxJava از الگوی Observer استفاده می کند.

    در الگوی Observer ، شما اشیایی دارید که دو رابط کلیدی RxJava را اجرا می کنند: Observable و Observer. وقتی یک وضعیت Observable تغییر کند تمام اشیا Observer مشترک در آن مطلع می شوند.

    از جمله متد های موجود در اینترفیس Observable ، subscribe() است كه یك Observer برای شروع اشتراك تماس می گیرد.

    از آن نقطه اینترفیس Observer دارای سه متد است که در صورت لزوم  Observable آنها را فراخوانی می کند:

    • onNext(T value) آیتم جدیدی از نوع T را به Observer ارائه می دهد.
    • onComplete() به Observer اطلاع می دهد که Observable ارسال موارد را به پایان رسانده است.
    • onError(Throwable e) به Observer اطلاع می دهد که Observable با خطایی روبرو شده است

    به عنوان یک قاعده  یک  Observable صفر یا چند مورد را منتشر می کند که می تواند تکمیل شود یا خطا داشته باشد.

    به نظر پیچیده می رسد ، اما برخی از نمودارهای marble ممکن است همه چیز را روشن کند

    دایره نشان دهنده موردی است که از observable و بلوک سیاه نشان دهنده یک تکمیل یا خطا است. به عنوان مثال ، یک درخواست شبکه را observable کنید. این درخواست معمولاً یک مورد (response) منتشر می کند و بلافاصله تکمیل می شود.

    یک حرکت ماوس مختصات ماوس را منتشر می کند اما هرگز کامل نمی شود:

    در اینجا می توانید چندین مورد را که منتشر شده اند مشاهده کنید اما هیچ بلوکی نشان نمی دهد ماوس خطایی را کامل یا ایجاد کرده است.

    پس از اتمام یک observable ، هیچ مورد دیگری قابل انتشار نیست. در اینجا مثالی از رفتارهای بد مشاهده شده است که قرارداد Observable را نقض می کند:

    این یک observable بد است زیرا با انتشار یک مورد پس از علامت کامل شدن ، قرارداد Observable را نقض می کند.

     

    نحوه ایجاد یک Observable

    کتابخانه های بسیاری وجود دارد که به شما کمک می کند تقریباً از هر نوع رویدادی observable بسازید. با این حال ، گاهی اوقات فقط باید یک مطالعه انجام بدید. علاوه بر این یک روش عالی برای یادگیری در مورد الگوی Observable و برنامه نویسی reactive است!

    با استفاده از Observable.create() یک Observable ایجاد خواهید کرد. در اینجا نمونه آن است:

    Observable<T> create(ObservableOnSubscribe<T> source)

     

    نمونه بالا خوب و مختصر است ، اما چه معنایی دارد؟ برای درک این نمونه ، باید بدانید که ObservableOnSubscribe چیست. این interface با این قرارداد است:

    public interface ObservableOnSubscribe<T> {
      void subscribe(ObservableEmitter<T> e) throws Exception;
    }
    

     

    بنابراین "منبعی" که شما برای ایجاد Observable خود نیاز دارید ، باید subscribe() را در معرض دید قرار دهد ، که به نوبه خود برای تهیه یک فرستنده به عنوان یک پارامتر به هر آنچه که آن را فرا می خواند نیاز دارد. پس یک منتشر کننده چیست؟

    اینترفیس RxJava Emitter مشابه اینترفیس Observer است:

    public interface Emitter<T> {
      void onNext(T value);
      void onError(Throwable error);
      void onComplete();
    }
    

     

    ObservableEmitter ابزاری را برای لغو اشتراک فراهم می کند.

    برای تجسم کل این وضعیت ، به یک شیر آب فکر کنید که جریان آب را تنظیم می کند. لوله های آب مانند یک  Observable است  اگر وسیله ای برای ضربه زدن به آن دارید می تواند جریان آب را تحویل دهد. شما یک شیرآلات درست می کنید که می تواند خاموش و روشن شود ، مانند یک ObservableEmitter است و آن را به لوله های آب موجود در Observable.create() متصل می کنید. نتیجه یک شیر فانتزی خوب است. و البته ، شیر آب واکنش پذیر است ، زیرا به محض بستن آن ، جریان آب - داده ها - دیگر فعال نیستند.

    یک مثال باعث می شود وضعیت انتزاعی نباشد و وضوح بیشتری داشته باشد. وقت آن است که اولین observable های خود را ایجاد کنید!

    کلیک های دکمه را Observe کنید

    کد زیر را در کلاس CheeseActivity اضافه میکنیم

    private fun createButtonClickObservable(): Observable<String> {
        // 2
        return Observable.create { emitter ->
            // 3
            searchButton.setOnClickListener {
                // 4
                emitter.onNext(queryEditText.text.toString())
            }
    
            // 5
            emitter.setCancellable {
                // 6
                searchButton.setOnClickListener(null)
            }
        }
    }
    

     

    import های شما پس از وارد کردن کد فوق باید به صورت زیر باشد:

    import io.reactivex.Observable
    import kotlinx.android.synthetic.main.activity_cheeses.*
    

     

    شما کلاس Observable صحیح را وارد کرده اید و از Kotlin Android Extensions برای دریافت منابع برای مشاهده اشیا استفاده می کنید.

    توضیحات کد بالا:

    1. شما تابعی را اعلام می کنید که  Observableرا بازگرداند که رشته هایی را منتشر کند.
    2. شما یک Observable را با Observable.create() ایجاد کرده و با یک ObservableOnSubscribe جدید آن را عرضه می کنید.
    3. OnClickListener را در searchButton راه اندازی میکنید.
    4. هنگامی که رویداد کلیک اتفاق می افتد ، از طریق فرستنده onNext را فراخوانی میکنید و مقدار متن فعلی queryEditText را به آن منتقل میکنید.
    5. نگه داشتن reference ها می تواند باعث memory leak در جاوا یا کاتلین شود. این عادت مفیدی است که listener  ها را به محض اینکه دیگر نیازی به آنها نیست حذف کنید. اما وقتی در حال ایجاد Observable خود هستید ، چه چیزی را صدا می کنید؟ به همین دلیل ، ObservableEmitterCancellable() را تنظیم کرده است. با override کردن cancel() و هنگامی که Observable از بین می رود ، اجرای آن فراخوانی می شود ، مانند زمانی که Observable کامل می شود یا unsubscribe میشود.
    6. برای OnClickListener ، کد (null)setOnClickListener , listener  را حذف می کند

    اکنون که Observable خود را تعریف کردید ، باید subscription آن را تنظیم کنید. قبل از انجام این کار ، باید با یک interfaceدیگر یعنی Consumer آشنا شوید. این یک روش ساده برای پذیرش مقادیری است که از یک فرستنده(emitter) وارد می شود.

    public interface Consumer<T> {
      void accept(T t) throws Exception;
    }
    

     

    این interface زمانی مفید است که می خواهید subscription ساده یک Observable را تنظیم کنید.

    Interface که برای Observable مینویسیم به چندین نسخه   subscribe()احتیاج دارد که پرارامترهای مختلفی دارند. به عنوان مثال ، اگر دوست دارید می توانید از یک Observer کامل عبور کنید ، اما لازم است تمام متد های لازم را پیاده سازی کنید.

    اگر تمام آنچه در خارج از subscription  نیاز دارید این است که observer به مقادیر ارسال شده به onNext() پاسخ دهد, می توانید از نسخه subscribe() استفاده کنید که یک  single Consumerرا در بر می گیرد.

    این کار را دقیقاً هنگام subscribe در activity خود در ()onStart انجام خواهید داد.

    کد زیر را به CheeseActivity.kt اضافه میکنیم:

    override fun onStart() {
        super.onStart()
        // 1
        val searchTextObservable = createButtonClickObservable()
    
        searchTextObservable
                // 2
                .subscribe { query ->
                    // 3
                    cheeseSearchEngine.search(query)?.let { showResult(it) }
                }
    }
    

     

    توضیحی درباره مراحل میدهیم:

    1. ابتدا با فراخوانی متدی که به تازگی نوشتید ، یک  observable ایجاد کنید.
    2. مشترک شدن در  observable  با subscribe()  و ارائه یک Consumer ساده.
    3. در آخر  جستجو را انجام دهید و نتایج را نشان دهید.

    برنامه را اجرا کنید. چند حرف وارد کنید و روی دکمه جستجو ضربه بزنید. پس از یک تاخیر شبیه سازی شده (به CheeseSearchEngine مراجعه کنید) ، باید لیستی از پنیرها را که مطابق با درخواست شما هستند ، ببینید.

    RxJava Threading Model

    شما اولین تجربه برنامه نویسی reactive را داشته اید. یک مشکل وجود دارد: با لمس دکمه جستجو ، رابط کاربر برای چند ثانیه مسدود می شود.

    در Android Monitor ممکن است متوجه خط زیر شوید:

    > 08-24 14:36:34.554 3500-3500/com.raywenderlich.cheesefinder I/Choreographer: Skipped 119 frames!
      The application may be doing too much work on its main thread.
    

     

    این اتفاق می افتد زیرا جستجو بر روی main thread اجرا می شود.اگر جستجو برای انجام درخواست شبکه انجام شود ، Android با یک استثنا(crash) NetworkOnMainThreadException برنامه را میبندد. وقت آن است که آن را برطرف کنیم.

    یکی از توضیحات RxJava این است که به صورت پیش فرض  multi-thread است شبیه AsyncTask. با این حال ، اگر روش دیگری مشخص نشده باشد ، RxJava تمام کارها را در همان thread انجام می دهد که از آن فراخوانی شده است.

    می توانید این عمل را با عملگرهای subscribeOn و ObserveOn تغییر دهید.

    subscribeOn قرار است فقط یک بار در زنجیره اپراتورها فراخوانی شود. اگر اینگونه نباشد ، اولین فراخوانی اجرا می شود. subscribeOn یک thread را مشخص می کند که observable در آن subscribe می شود (یعنی ایجاد می شود). اگر از یک observable استفاده می کنید که event ها را از نمای Android منتشر می کنند ، باید مطمئن شوید که subscription در thread Android UI انجام شده است. از طرف دیگر اشکالی ندارد هر تعداد بار که خواستید در زنجیره Observeon را فراخوانی کنید.

    ObservOn یک thread را مشخص می کند که اپراتورهای بعدی در این زنجیره اجرا می شوند.  مثال زیر را ببینید:

    myObservable // observable will be subscribed on i/o thread
    
          .subscribeOn(Schedulers.io())
    
          .observeOn(AndroidSchedulers.mainThread())
    
          .map { /* this will be called on main thread... */ }
    
          .doOnNext{ /* ...and everything below until next observeOn */ }
    
          .observeOn(Schedulers.io())
    
          .subscribe { /* this will be called on i/o thread */ }
    

     

    مفیدترین scheduler ها عبارتند از:

    • ()Schedulers.io: مناسب برای کارهای ورودی و خروجی مانند درخواست شبکه یا عملکرد دیسک.
    • Schedulers.computation(): با وظایف محاسباتی مانند حلقه های رویداد و پردازش  callback ها بهترین عملکرد را دارد
    • AndroidSchedulers.mainThread() : عملگرهای بعدی را در UI thread اجرا می کند.
    •  

    اپراتور Map

    اپراتور map تابعی را برای هر مورد منتشر شده توسط یک  observableاعمال می کند و یک observable دیگر را که نتایج حاصل از آن فراخوانی های تابع را منتشر می کند  برمی گرداند. برای رفع مشکل thread نیز به این مورد احتیاج دارید.

    اگر observable به نام numbers دارید که موارد زیر را منتشر می کند:

    و اگر map را به صورت زیر اعمال می کنید:

    numbers.map { number -> number * number }

     

    نتیجه به شرح زیر خواهد بود:

    این یک روش مفید برای تکرار چندین مورد با کد کم است. بگذارید استفاده کنیم!

    متد ()onStart کلاس CheeseActivity به صورت زیر تغییر میدهیم:

    override fun onStart() {
        super.onStart()
    
        val searchTextObservable = createButtonClickObservable()
    
        searchTextObservable
                // 1
                .subscribeOn(AndroidSchedulers.mainThread())
                // 2
                .observeOn(Schedulers.io())
                // 3
                .map { cheeseSearchEngine.search(it)!! }
                // 4
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    showResult(it)
                }
    }
    

     

    کد بالا را مرور میکنیم:

    1. ابتدا مشخص میکنیم که کد به جای I/O thread باید روی main thread شروع شود. در Android ، همه کدهایی که با View کار می کنند باید روی main thread اجرا شوند.
    2. مشخص میکنیم که عملگر بعدی باید روی I/O thread فراخوانی شود.
    3. برای هر search query ، لیستی از نتایج را برمی گردانید.
    4. در آخر ، اطمینان حاصل میکنیم که نتایج به لیست main thread منتقل شده اند.

    پروژه را اجرا کنید. اکنون UI باید پاسخگو باشد حتی زمانی که جستجو در حال انجام است.

    نمایش ProgressBar با doOnNext

    برای این منظور شما به یک عملگر doOnNext نیاز دارید. doOnNext یک Consumer می گیرد و به شما امکان می دهد هر بار که موردی توسط observable منتشر می شود کاری انجام دهید.

    در کلاس CheeseActivity  , ()onStart را مانند زیر تغییر میدهیم:

    override fun onStart() {
        super.onStart()
    
        val searchTextObservable = createButtonClickObservable()
    
        searchTextObservable
                // 1
                .observeOn(AndroidSchedulers.mainThread())
                // 2
                .doOnNext { showProgress() }
                .observeOn(Schedulers.io())
                .map { cheeseSearchEngine.search(it)!! }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // 3
                    hideProgress()
                    showResult(it)
                }
    }
    

     

    توضیحات کد بالا:

    1. اطمینان حاصل میکنیم که اپراتور بعدی در زنجیره بر روی main thread اجرا می شود.
    2. عملگر doOnNext را اضافه میکنیم تا هر زمان که مورد جدیدی منتشر شد ()showProgress فراخوانی شود.
    3. فراموش نکنید هنگامی که می خواهید نتیجه ای را نمایش دهید ()hideProgress را فراخوانی کنید.

    پروژه اجرا کنید. هنگام شروع جستجو باید ببینید progress bar ظاهر می شود:

    Observe Text Changes

    اگر بخواهید وقتی کاربر برخی از متن ها را درست مثل Google تایپ می کند ، جستجو را انجام دهید چه؟

    ابتدا ، باید subscribe کنیم به  textchanges از TextView. متد زیر را به کلاس CheeseActivity اضافه میکنیم:

    // 1
    private fun createTextChangeObservable(): Observable<String> {
        // 2
        val textChangeObservable = Observable.create<String> { emitter ->
            // 3
            val textWatcher = object : TextWatcher {
    
                override fun afterTextChanged(s: Editable?) = Unit
    
                override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit
    
                // 4
                override fun onTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {
                    s?.toString()?.let { emitter.onNext(it) }
                }
            }
    
            // 5
            queryEditText.addTextChangedListener(textWatcher)
    
            // 6
            emitter.setCancellable {
                queryEditText.removeTextChangedListener(textWatcher)
            }
        }
    
        // 7
        return textChangeObservable
    }
    

     

    کد بالا را خط به خط بریم جلو:

    1. متدی را از نوع observable تعریف میکنیم برای تغییرات متن.
    2. تعریف متغیر textChangeObservable با استفاده از  ()create که نوع آن ObservableOnSubscribe است.
    3. هنگامی که یک observer از نوع اشتراکی تعریف میکنیم  اولین کاری که باید انجام شود ایجاد یک TextWatcher است.
    4. ما با قسمت ()beforeTextChanged و afterTextChanged() کاری نداریم اما در قسمت onTextChanged() مقدار متن جدید را به یک observer منتقل میکنیم.
    5. با فراخوانی ()addTextChangedListener, watcher را به TextView خود اضافه کنید.
    6. فراموش نکنید که watcher خود را حذف کنید. برای این کار ، از emitter.setCancellable() و نوشتن مجدد cancel()  برای فراخوانی     ()removeTextChangedListener استفاده میکنیم.
    7. سرانجام observable ایجاد شده را برگردانید.

    متغیر searchTextObservable که در متد onStart() , CheeseActivity تعریف کردیم را به صورت زیر تغییر دهید:

    
    val searchTextObservable = createTextChangeObservable()
    

     

    برنامه را اجرا کنید. وقتی شروع به تایپ متن در TextView می کنید باید جستجو شروع شود:

     

    درخواستها را براساس طول فیلتر میکنیم(Filter Queries by Length)

    منطقی نیست که با وارد کردن یک حرف شروع کنیم به جستجو کردن. برای رفع این مشکل ، از اپراتور قدرتمند filter استفاده میکنیم. filter فقط از مواردی عبور می کند که شرایط خاصی را برآورده می کنند. فیلتر یک predict را می گیرد در این حالت Predicate یک رشته را می گیرد و اگر طول رشته دو یا چند کاراکتر باشد ، true برمی گردد.

    return textChangeObservable در متد createTextChangeObservable() را با کد زیر جایگزین میکنیم:

    return textChangeObservable.filter { it.length >= 2 }

     

    همه کارها دقیقاً مشابه قبل خواهند بود ، با این تفاوت که نمایش داده های متنی با طول کمتر از 2 به زنجیره ارسال نمی شوند.

    برنامه را اجرا کنید ؛ شما باید ببینید که فقط در هنگام تایپ کردن کاراکتر دوم جستجو شروع می شود:

    Debounce Operator

    ما نمی خواهیم هر بار که درخواست یک کاراکتر تغییر کرد درخواست جدیدی به سرور ارسال کنیم.

    debounce یکی از آن اپراتورهایی است که قدرت واقعی الگوی reactive را نشان می دهد. دقیقاً مانند عملگر فیلتر debounce  مواردی را که توسط  observable منتشر می شود فیلتر می کند. debounce بعد از انتشار هر مورد برای مورد بعدی مدت زمان مشخصی منتظر می ماند. اگر طی این انتظار هیچ موردی منتشر نشود ، سرانجام آخرین مورد منتشر می شود:

    در ()createTextChangeObservable ، اپراتور debounce را درست در زیر فیلتر اضافه میکنیم تا دستور بازگشت مانند کد زیر باشد:

    return textChangeObservable
            .filter { it.length >= 2 }
            .debounce(1000, TimeUnit.MILLISECONDS) // add this line
    

     

    debounce قبل از انتشار آخرین متن 1000 میلی ثانیه منتظر می ماند.

     

    اپراتور Merge

    شما یک observable که نسبت به کلیک دکمه ها واکنش نشان می دهد شروع کردید و سپس یک observable را که به تغییرات فیلد متن واکنش نشان می دهد پیاده سازی کردید. اما observable شما در برابر هر دو واکنش نشان می دهد؟

    اپراتورهای زیادی برای ترکیب  observable ها وجود دارد. ساده ترین و مفیدترین آن ها merge است.

    merge موارد را از دو یا چند observable می گیرد و آنها را در یک observable قرار می دهد:

    متد onStart() را به شکل زیر تغییر میدهیم:

    override fun onStart() {
        super.onStart()
    
        val buttonClickStream = createButtonClickObservable()
        val textChangeStream = createTextChangeObservable()
    
        val searchTextObservable = Observable.merge<String>(buttonClickStream, textChangeStream)
    
    
        searchTextObservable
                // 1
                .observeOn(AndroidSchedulers.mainThread())
                // 2
                .doOnNext { showProgress() }
                .observeOn(Schedulers.io())
                .map { cheeseSearchEngine.search(it)!! }
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // 3
                    hideProgress()
                    showResult(it)
                }
    }
    
    

     

    برنامه خود را اجرا کنید. با قسمت متن و دکمه جستجو بازی کنید. جستجو وقتی شروع می شود که تایپ کردن دو یا چند نماد را تمام کنید یا وقتی دکمه جستجو را فشار می دهید.

     

    اپراتور Flowable

    با انتشار RxJava2 ، چارچوب کاملاً از ابتدا برای حل برخی از مشکلات که در کتابخانه اصلی به آنها پرداخته نشده بود دوباره طراحی شده است. یکی از موضوعات بسیار مهمی که در بروزرسانی به آن پرداخته شده ، ایده backpressure است.

    Backpressure این مفهوم است که یک observable سریعتر از آنچه consumer از عهده آن برمی آید داده منتشر می کند. مثال Firehose توییتر را در نظر بگیرید که با افزودن توییتر به پلتفرم توییتر  مرتباً توییت هایی را منتشر می کند. اگر بخواهید از  observable استفاده کنید  تا زمانی که حافظه دیگری در دسترس نباشد برنامه شما خراب می شود و با استفاده از API firehose امکان استفاده از آنها وجود ندارد. Flowable ها این مورد را در نظر می گیرند و به شما اجازه می دهند یک استراتژی BackPressure مشخص کنید تا به Flowable بگویید که چگونه می خواهید  Consumerاز موارد منتشر شده سریعتر از مصرف استفاده کند.

    استراتژی BackPressure:

    • BUFFER موارد را به همان روش RxJava 1 هندل می کند اما می توانید اندازه بافر را نیز اضافه کنید.
    • DROP هر موردی راکه consumer قادر به اداره آن نیست را رها می کند.
    • ERROR وقتی پایین دست نمی تواند ادامه دهد ، خطایی ایجاد می کند.
    • LATEST فقط آخرین مورد منتشر شده توسط onNext را جایگزین مقدار قبلی می کند.
    • MISSING بدون بافر کردن یا رها کردن در رویداد های بعدی.

    تبدیل  Observable به Flowable

    زمان آن است که با استفاده از این دانش جدید از استراتژی  backpressure ، observable های فوق را به  flowable تبدیل کنیم. ابتدا  observable هایی را که به برنامه خود اضافه کرده اید در نظر بگیرید. شما یک  observable دارید که با کلیک یک دکمه موارد را منتشر می کند و دیگری از ورودی صفحه کلید. با در نظر گرفتن این دو مورد در حالت اول تصور کنید که می توانید از LATEST و در حالت دوم می توانید از BUFFER استفاده کنید.

    CheeseActivity.kt را باز کرده و observable های خود را به مانند زیر تغییر میدهیم:

    val buttonClickStream = createButtonClickObservable()
            .toFlowable(BackpressureStrategy.LATEST) // 1
    
    val textChangeStream = createTextChangeObservable()
            .toFlowable(BackpressureStrategy.BUFFER) // 2
    

     

    1. با استفاده از LATEST BackpressureStrategy کلیک دکمه را به یک flowable تبدیل کردیم.
    2. با استفاده از BUFFER BackpressureStrategy جریان تغییر متن ورودی را یک Flowable تبدیل کردیم.

    در آخر ، عملگر  merge را تغییر میدهیم تا از Flowable نیز استفاده کند:

    val searchTextFlowable  = Flowable.merge<String>(buttonClickStream,
     textChangeStream)
    

     

    اکنون برای استفاده از مقدار جدید searchTextFlowable  به جای مقدار قبلی Observableجایی که آن را فراخوانی کرده ایم نامش را تغییر میدهیم.

    searchTextFlowable
            // 1
            .observeOn(AndroidSchedulers.mainThread())
            // 2
            .doOnNext { showProgress() }
            .observeOn(Schedulers.io())
            .map { cheeseSearchEngine.search(it)!! }
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                // 3
                hideProgress()
                showResult(it)
            }
    

     

    برنامه را دوباره اجرا کنید و باید یک برنامه در حال کار را ببینید که هیچ یک از مشکلات  observableرا ندارد.

     

    Maybe

    Maybe یک محاسبه است که یا یک value single بدون مقدار یا خطا منتشر می کند. آنها برای مواردی مانند به روزرسانی و حذف پایگاه داده مناسب هستند. در اینجا شما یک ویژگی جدید را با استفاده از Maybe به نوع خاصی از پنیر از برنامه اضافه می کنید و از Maybe برای انتشار مقدار هیچ استفاده می کنید.

    به کلاس CheeseAdapter میرویم و کد زیر را در onBindView اضافه میکنیم:

    // 1
    Maybe.create<Boolean> { emitter ->
        emitter.setCancellable {
            holder.itemView.imageFavorite.setOnClickListener(null)
        }
    
        holder.itemView.imageFavorite.setOnClickListener {
            emitter.onSuccess((it as CheckableImageView).isChecked) // 2
        }
    }.toFlowable().onBackpressureLatest() // 3
            .observeOn(Schedulers.io())
            .map { isChecked ->
                cheese.favorite = if (!isChecked) 1 else 0
                val database = CheeseDatabase.getInstance(holder.itemView.context).cheeseDao()
                database.favoriteCheese(cheese) // 4
                cheese.favorite // 5
            }
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe {
                holder.itemView.imageFavorite.isChecked = it == 1 // 6
            }
    

     

    توضیحات کد بالا:

    1. یک action از Maybe میسازیم.
    2.  چک کردن وضعیت در زمان .success
    3. تبدیل Maybe به Flowable
    4. انجام به روزرسانی را روی جدول پنیرها.
    5. نتیجه عملیات را برمیگرداند.
    6. از نتیجه حاصل از انتشار استفاده میکنیم تا تصویر قلب خالی به قلب پر شده تغییر داده شود.

    توجه: بهتر است که از Maybe برای عملیات حذف استفاده کنید اما به عنوان مثال در اینجا می توانید یک پنیر را favorate کنید.

    RxJava2 & Null

    مقدار Null دیگر در RxJava2 پشتیبانی نمی شود.استفاده از null باعث ایجاد NullPointerException میشود.

     

    RxJava and Activity/Fragment lifecycle

    متدهای setCancellable قابل تنظیم را به خاطر می آورید؟ تا زمانی که observable لغو نشود  آنها unsubscribe نخواهند شد.

    فراخوانی Observable.subscribe() یک Disposable را برمیگرداند. Disposable یک interface است که دارای دو متد است:

    public interface Disposable {
      void dispose();  // ends a subscription
      boolean isDisposed(); // returns true if resource is disposed (unsubscribed)
    }
    

     

    property زیر را به CheeseActivity اضافه کنید:

    private lateinit var disposable: Disposable

     

    در onStart() مقدار برگشتی subscribe() را با استفاده از کد زیر در  disposable تنظیم کنید (فقط خط اول تغییر می کند):

    disposable = searchTextFlowable // change this line
            // 1
            .observeOn(AndroidSchedulers.mainThread())
            // 2
            .doOnNext { showProgress() }
            .observeOn(Schedulers.io())
            .map { cheeseSearchEngine.search(it)!! }
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                // 3
                hideProgress()
                showResult(it)
            }
    

     

    از آنجا که observable در ()onStart , subscribe شد ،onStop() مکانی عالی برای unsubscribe خواهد بود.

    کد زیر را به CheeseActivity.kt اضافه کنید:

    override fun onStop() {
        super.onStop()
        if (!disposable.isDisposed) {
            disposable.dispose()
        }
    }
    

     

    برنامه را اجرا کنید. خودتان هیچ تغییری را مشاهده نخواهید کرد ، اما اکنون این برنامه با موفقیت از  RxJava memory leakجلوگیری می کند.

     

    سوالات و مشکلات خود را در قسمت کامنت ها مطرح کنید تا ما به آنها پاسخ دهیم.

     


    • نویسنده: میثم بابائی

    ارسال دیدگاه

    برای افزودن دیدگاه خود، نیاز است ابتدا وارد حساب کاربری‌تان شوید


    دیدگاه کاربران