欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Golang: 外部排序项目

程序员文章站 2024-03-03 19:12:40
...

项目流程:

  1. 生成一个随机数文件,假设文件很大
  2. 从文件中分块地读取数据到内存,进行各个结点的内部排序
  3. 归并排序得到最终的排序结果写入文件中

应用场景:

  1. 单机内存大小不足时,想要对大数据进行排序。
  2. 可以添加其他分布式管理的功能,例如对分布式日志文件进行整合。

涉及到的Golang特性:

  1. 面向接口–Reader/Writer接口的使用
  2. 函数式编程
  3. 并发编程

源码链接:https://github.com/chao2015/externalsort

源码分析:

Golang: 外部排序项目

1. channel通信

// 数据源来自于一个Array
func ArraySource(a ...int) <-chan int {
    // 调用的真实情况是,函数新建一个channel并马上返回,并行的goroutine来进行发送数据的操作,发送完后记得close。
    // func: 1.新建一个channel
    out := make(chan int)
    // go: 1.发送数据(channel是goroutine之间的通信管道)
    go func() {
        for _, v := range a {
            out <- v
        }
        // go: 2.关闭channel,否则会报错:fatal error: all goroutines are asleep - deadlock!
        close(out)
    }()
    // func: 2.返回这个channel
    return out
}

func TestArraySource(t *testing.T) {
    // 1. channel通信
    p := pipeline.ArraySource(3, 2, 6, 7, 4)
    // (1)
    //for {
    //  if num, ok := <-p; ok {
    //      fmt.Println(num)
    //  } else {
    //      break
    //  }
    //}

    //(2) 简略写法
    for v := range p {
        fmt.Println(v)
    }
}

Output:

3
2
6
7
4

2. 内部排序

var startTime time.Time

func Init() {
    startTime = time.Now()
}

// 内部排序
func InMemSort(in <-chan int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        // Read into memory
        a := []int{}
        for v := range in {
            a = append(a, v)
        }
        fmt.Println("Read done:", time.Now().Sub(startTime))

        // Sort
        sort.Ints(a)
        fmt.Println("InMemSort done:", time.Now().Sub(startTime))

        // Output
        for _, v := range a {
            out <- v
        }
        close(out)
    }()
    return out
}

func TestInMemSort(t *testing.T) {
    // 2. 内部排序
    p := pipeline.InMemSort(
        pipeline.ArraySource(3, 2, 6, 7, 4))
    for v := range p {
        fmt.Println(v)
    }
}

Output:

Read done: 20.008µs
InMemSort done: 62.384µs
2
3
4
6
7

3. 归并排序

func Merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        v1, ok1 := <-in1
        v2, ok2 := <-in2
        for ok1 || ok2 {
            if !ok2 || (ok1 && v1 <= v2) {
                out <- v1
                v1, ok1 = <-in1
            } else {
                out <- v2
                v2, ok2 = <-in2
            }
        }
        close(out)
        fmt.Println("Merge done:", time.Now().Sub(startTime))
    }()
    return out
}

func TestMerge(t *testing.T) {
    pipeline.Init()

    // 3. 归并排序
    p := pipeline.Merge(
        pipeline.InMemSort(pipeline.ArraySource(3, 2, 6, 7, 4)),
        pipeline.InMemSort(pipeline.ArraySource(7, 4, 0, 3, 2, 8, 13, 8)))
    for v := range p {
        fmt.Println(v)
    }
}

Output:

Read done: 33.618µs
InMemSort done: 82.779µs
Read done: 60.004µs
InMemSort done: 122.588µs
Merge done: 133.939µs
0
2
2
3
3
4
4
6
7
7
8
13

4. 随机数生成函数

// 随机生成count个int型数据
func RandomSource(count int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < count; i++ {
            out <- rand.Int()
        }
        close(out)
    }()
    return out
}

func TestRandomSource(t *testing.T) {
    p := pipeline.RandomSource(10)
    for v := range p {
        fmt.Println(v)
    }
}

Output:

5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
... ...

5. 读写数据

读写端要做到:

  1. buffer大小一致
  2. 大小端一致
// 读数据。第一个参数是读的来源对象,第二个参数是读取长度(-1全读)。输出是一个channel
func ReaderSource(reader io.Reader, chunkSize int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        // 64位系统的int型大小是64,所以用一个64位buffer = byte(8)*8
        buffer := make([]byte, 8)
        // 读取长度的控制变量
        bytesRead := 0
        for {
            // n是读取的长度
            n, err := reader.Read(buffer)
            bytesRead += n
            // 可能最后读取4字节数据,nil=EOF,所以要先读取数据,再判断nil
            if n > 0 {
                // 大端还是小端,发送和接收端统一即可
                out <- int(binary.BigEndian.Uint64(buffer))
            }
            if err != nil ||
                (chunkSize != -1 && bytesRead >= chunkSize) {
                break
            }
        }
        close(out)
    }()
    return out
}

// 写数据。第一个参数是写的目的对象,第二个参数是写的数据channel
func WriteSink(writer io.Writer, in <-chan int) {
    for v := range in {
        buffer := make([]byte, 8)
        binary.BigEndian.PutUint64(buffer, uint64(v))
        writer.Write(buffer)
    }
}

func main() {
    const filename = "small.in"
    const n = 64

    // 新建文件,返回可用的文件描述符
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()

    // 生成随机数
    p := pipeline.RandomSource(n)

    // 写数据到文件
    // 包装文件描述符,使用缓存机制,提高读写速度
    writer := bufio.NewWriter(file)
    pipeline.WriteSink(writer, p)
    writer.Flush() // 确保缓存数据全部写入

    // 上面的文件描述符offset在末尾,不能用于读取
    // 打开文件
    file, err = os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()

    // 读取数据
    p = pipeline.ReaderSource(bufio.NewReader(file), -1)
    count := 0
    for v := range p {
        fmt.Println(v)
        count++
        if count >= 100 {
            break
        }
    }
}

Output:

// 生成small.in数据文件,可以查看文件大小,以上代码生成64*8=512bit
5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
... ...

6. 单机版外部排序

  • 前面4步就是demo测试,通过第5步生成small.in的待排序文件是必要的。
  • 下面所要做的工作,就是从待排序文件中分块读取数据,块内排序后再归并排序,生成结果。
// 搭建归并节点组,递归调用实现2路归并
func MergeN(inputs ...<-chan int) <-chan int {
    if len(inputs) == 1 {
        return inputs[0]
    }
    m := len(inputs) / 2
    // merge inputs[0..m) and inputs [m..end)
    return Merge(
        MergeN(inputs[:m]...),
        MergeN(inputs[m:]...))
}

// pipeline的搭建及运行,单机上时,分块数(chunkCount)最好是cpu的核数
func createPipeline(
    filename string,
    fileSize, chunkCount int) <-chan int {

    chunkSize := fileSize / chunkCount
    pipeline.Init()

    // 初始化结点组
    sortResults := []<-chan int{}
    for i := 0; i < chunkCount; i++ {
        // 打开文件
        file, err := os.Open(filename)
        if err != nil {
            panic(err)
        }
        // 设置offset
        file.Seek(int64(i*chunkSize), 0)
        // 读取数据
        source := pipeline.ReaderSource(
            bufio.NewReader(file), chunkSize)
        // 内部排序后,追加到结点组中
        sortResults = append(sortResults, pipeline.InMemSort(source))
    }

    // 归并结点组
    return pipeline.MergeN(sortResults...)
}

func writeToFile(p <-chan int, filename string) {
    // 创建文件
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    // 写入文件
    writer := bufio.NewWriter(file)
    defer writer.Flush()
    pipeline.WriteSink(writer, p)
}

func printFile(filename string) {
    // 打开文件
    file, err := os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    // 读取数据
    p := pipeline.ReaderSource(file, -1)
    count := 0
    for v := range p {
        fmt.Println(v)
        count++
        if count >= 100 {
            break
        }
    }
}

func main() {
    // small 512 4
    // large 80000000 4
    const filename_prefix = "small" 

    // 单机版
    p := createPipeline(filename_prefix+".in", 512, 4)  // 文件大小(512/80000000),读取块数(4)

    writeToFile(p, filename_prefix+".out")
    printFile(filename_prefix + ".out")
}

Output:

// 生成small.out文件
Read done: 357.557µs
InMemSort done: 646.607µs
Read done: 703.595µs
InMemSort done: 717.006µs
Read done: 752.546µs
InMemSort done: 768.127µs
Merge done: 809.478µs
Read done: 925.135µs
InMemSort done: 1.103011ms
Merge done: 1.149634ms
Merge done: 1.167648ms
Sorted:
261049867304784443
545291762129038907
605394647632969758
685213522303989579
732830328053361739
894385949183117216
... ...

7. 集群版外部排序

  • 通过多端口之间的tcp通信来模拟集群。
// WriteSink的网络封装,写入数据
// 第一个参数是端口号,如":7000",第二个参数是传输的数据channel
func NetworkSink(addr string, in <-chan int) {
    // 监听
    listener, err := net.Listen("tcp", addr)
    if err != nil {
        panic(err)
    }

    go func() {
        defer listener.Close()

        // 等待连接请求,返回下一个连接
        conn, err := listener.Accept()
        if err != nil {
            panic(err)
        }
        defer conn.Close()
        // 写入数据
        writer := bufio.NewWriter(conn)
        defer writer.Flush()
        WriteSink(writer, in)
    }()
}

// ReaderSource的网络封装,读取数据
// 唯一参数是端口号。输出一个channel
func NetworkSource(addr string) <-chan int {
    out := make(chan int)
    go func() {
        // 连接
        conn, err := net.Dial("tcp", addr)
        if err != nil {
            panic(err)
        }
        // 读取数据
        r := ReaderSource(bufio.NewReader(conn), -1)
        for v := range r {
            out <- v
        }
        close(out)
    }()
    return out
}

func createNetworkPipeline(
    filename string,
    fileSize, chunkCount int) <-chan int {

    chunkSize := fileSize / chunkCount
    pipeline.Init()

    // 初始化端口号组
    sortAddr := []string{}
    for i := 0; i < chunkCount; i++ {
        // 打开文件
        file, err := os.Open(filename)
        if err != nil {
            panic(err)
        }
        // 设置offset
        file.Seek(int64(i*chunkSize), 0)
        // 读取数据
        source := pipeline.ReaderSource(bufio.NewReader(file), chunkSize)
        // 端口号
        addr := ":" + strconv.Itoa(7000+i)
        // 起tcp服务
        pipeline.NetworkSink(addr, pipeline.InMemSort(source))
        // 追加到端口号组
        sortAddr = append(sortAddr, addr)
    }

    // 初始化结点组
    sortResults := []<-chan int{}
    for _, addr := range sortAddr {
        sortResults = append(sortResults, pipeline.NetworkSource(addr))
    }

    // 归并结点组
    return pipeline.MergeN(sortResults...)
}

func main() {
    // small 512 4
    // large 80000000 4
    const filename_prefix = "small"

    // 网络版
    p := createNetworkPipeline(filename_prefix+".in", 512, 4)

    writeToFile(p, filename_prefix+".out")
    printFile(filename_prefix + ".out")
}

Output:

Read done: 863.808µs
InMemSort done: 1.21919ms
Read done: 1.256505ms
InMemSort done: 1.262217ms
Read done: 1.233068ms
Read done: 1.309252ms
InMemSort done: 1.352371ms
InMemSort done: 1.313725ms
Merge done: 1.902537ms
Merge done: 2.043074ms
Merge done: 2.066381ms
Sorted:
261049867304784443
545291762129038907
605394647632969758
685213522303989579
732830328053361739
... ...