Monday, January 13, 2020

Another sample code of SPMC (Single Producer Mutiple Consumer) In Rust

Following code are re-implementation of a simple Go channel program listed below.
It just reads strings  from input channel and convert it to struct data and send it to another channel, then when all the input string channel are processed,start the process of printing the data from the channel. ( this behavior is not so natural, but just a sample code to see how join will occur.)

It now possible to write pretty much similar code in Rust to the corresponding Go channel code using the  crossbeam library.

 https://stjepang.github.io/2017/08/13/designing-a-channel.html
 https://stjepang.github.io/2019/01/29/lock-free-rust-crossbeam-in-2019.html


Although this is a simple program, but it requires SPMC channel, the current Rust's std library only supports mpsc channel,  it becomes quite complicated if we try to simulate spmc using only mpsc channel. See several related articles found in web.

 https://medium.com/@polyglot_factotum/rust-concurrency-patterns-communicate-by-sharing-your-sender-11a496ce7791

The last part of Rust book is also describing similar thing in complicated approach.
btw, this Rust code define generic create_receiver function, but Go cannot since it has no generic.
 

In the end, Go allows to write a code causing data race easily.
While rustc guarantees the code has no data race issues.
In fact, from programming point of view, Rust is higher level programming language for concurrent problems than Go while its run-time performance is better than Go. 

Although this crossbeam is not part of std lib, but it will be in this year, 2020(?)
https://blog.yoshuawuyts.com/rust-2020/

the codes are available in https://github.com/calathus/channel-sample

Rust:

extern crate crossbeam;

use std::thread;
use crossbeam::crossbeam_channel::{Receiver, Sender, unbounded};
use crossbeam::sync::{WaitGroup};

const THREADS: usize = 4;

struct Info {
    n: i32,
    s: String,
}

fn create_receiver<T: Clone>(vec: Vec<T>) -> Receiver<T> {
    let (s, r) = unbounded();
    for e in vec.iter() {
        s.send(e.clone()).unwrap();
    }
    return r;
}

fn process_data(i: i32, ss_r: Receiver<String>, info_s: Sender<Info>) {
    for s in ss_r.iter() {
        info_s.send(Info{n: i, s: s}).unwrap();
    }
    println!("process_data {} done.", i);
}


fn main() {
    let vec = get_data();
    let ss_r = create_receiver(vec);
    let (info_s, info_r) = unbounded();

    thread::spawn(move || {
        let wg = WaitGroup::new();

        for i in 0..THREADS {
            let wg0 = wg.clone();
            let ss_r0 = ss_r.clone();
            let info_s0 = info_s.clone();

            thread::spawn(move || {
                process_data(i as i32, ss_r0, info_s0);
                drop(wg0);
            });
        }
        wg.wait();
        drop(info_s)
    });

    println!(">> start info printing.");
    for info in info_r.iter() {
        println!("n: {}, s: {}", info.n, info.s);
    }
    println!("done.");
}


fn get_data()-> Vec<String> {
    let mut v: Vec<String> = Vec::new();
    for i in 0..10000 {
        let s = format!("s{}", i);
        v.push(s);
    }
    return v;
}



GO:
package main

import (
    "fmt"
    "strings"
    "sync"
)

type Info struct {
    p int
    s string
}

func create_ssc() chan string {
    ssc := make(chan string)
    go func() {
        defer close(ssc)
        for _, s := range data() {
            ssc <- s
        }
    }()
    return ssc
}

func create_infoc() chan *Info {
    infos := make(chan *Info)
    return infos
}

func handle_ssc(n int, ssc chan string, infos chan *Info) {
    for s := range ssc {
        infos <- &Info{p: n, s: strings.ToUpper(s)}
    }
}

func main() {
    var width = 4

    ssc := create_ssc()
    infos := create_infoc()

    go func() {
        defer close(infos)
        var wg sync.WaitGroup
        wg.Add(width)

        for i := 0; i < width; i++ {
            go func(n int) {
                defer wg.Done()
                handle_ssc(n, ssc, infos)
            }(i)
        }
        wg.Wait()
    }()

    for i := range infos {
        fmt.Println(i)
    }
}

func data() []string {
    return []string{
        "aaa",
        "bb",
        "cc",
        "sss",
        "qq",
        "ww"}
}


No comments:

Post a Comment

Recursive Matrix and the parallel matrix multiplication using crossbeam and generic constant

This was planned project I posted before. Basically in order to evaluate Rust's claim for zero cost abstraction and the effectiveness o...