Channels trong Coroutines - p1


#1

Hi,

phần một nhà quê, chúng ta đã đi từ nông thôn lên thành thị với con tàu tên là Coroutines, tin vui là có nhiều con tàu như vậy trong ứng dụng chúng ta, tin ko vui lắm là các con tàu này cần một cơ chế để có thể liên lạc với nhau, cơ chế đó tên là Channels

Mình xin nói một chút ở đây, đó là bản chất coroutines cũng hông có gì magic ghê ghớm lắm mà tận dụng các khái niệm như Future, Job, Deferred, BlockingQueue trong Java và làm cho nó trông dễ dàng hơn mà thôi, nên hy vọng các bạn không nên quá kỳ vọng :slight_smile:

Giờ bắt đầu bằng ví dụ đơn giản nhất :

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {

    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
    }
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

1
4
9
16
25
Done!

Chúng ta tạo ra một channels với loại message là Int rồi gửi và nhận thông qua 02 hàm send() và receive() , dễ hiểu nhỉ

Tiếp theo mình cùng nhau tạo ứng dụng tung-tăng (ping-pong) để hình dung :

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    val receiveChannel = produceMessage()

    receiveChannel.consumeEach {
        println("Tang : ${it}")
    }
}

fun CoroutineScope.produceMessage() : ReceiveChannel<String> = produce <String> {
    var x = 1
    while (true) {
        send( "Tung : ${x++}")
    }
}

Kết quả cũng thiệt bất ngờ, ứng dụng chúng ta chạy ngon lành, nếu bạn làm việc này với thread thì mình không chắc nó sẽ chạy được trong bao lâu

Đoạn code trên tạo một channels kiểu String, gửi message vô tận, phần xử lý thì chỉ đơn giản là in ra message từ channels thôi, nhưng điểm khác nhau là nó cũng lặp vô tận nốt. Chú ý là ở đây phần gửi và nhận là chạy độc lập trong 02 coroutines khác nhau nhưng lại có tính tuần tự gửi xong rồi nhận, gọi là structured concurrency, coroutines làm chúng trông có vẻ đơn giản hơn thay vì callback, async các kiểu. Ở đây nội dung message kiểu String được truyền thông qua channels để thao tác giữa các coroutines thay vì chia sẻ các biến giữa các thread dễ gây ra deadlock

Bên cạnh đó, Kotlin cũng thêm vào một số tính năng hay ho trên channels như pipelines chẳng hạn, ý tưởng là từ khi message được gửi đi từ coroutines này tới coroutines nhận thì message đó có thể được xử lý bởi một coroutines thứ ba, thứ tư … trước khi nó đến được coroutines nhận, cùng xem qua ví dụ :

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    var receiveChannel = produceMessage()
    receiveChannel = format(receiveChannel, "Formatted")
    receiveChannel.consumeEach {
        println("Tung : ${it}")
    }
}

fun CoroutineScope.produceMessage() : ReceiveChannel<String> = produce <String> {
    var x = 1
    while (true) {
        send( "Tang : ${x++}")
    }
}

fun CoroutineScope.format(channels : ReceiveChannel<String>, pattern: String) = produce<String> {
    for (message in channels) {
        send("${pattern} - ${message}")
    }
}

Ở trên, mình thêm một coroutines nữa để format message trước khi gửi tới receiver , lúc này coroutines format sẽ có thêm tham số là channels trước nó , cũng có thể thêm tham số khác ngoài channels , chỗ này mình thêm một chuỗi pattern vô :

Tung : Formatted - Tang : 181376304

(còn tiếp)