Golang: 外部排序项目
程序员文章站
2024-03-03 19:12:40
...
项目流程:
- 生成一个随机数文件,假设文件很大
- 从文件中分块地读取数据到内存,进行各个结点的内部排序
- 归并排序得到最终的排序结果写入文件中
应用场景:
- 单机内存大小不足时,想要对大数据进行排序。
- 可以添加其他分布式管理的功能,例如对分布式日志文件进行整合。
涉及到的Golang特性:
- 面向接口–Reader/Writer接口的使用
- 函数式编程
- 并发编程
源码链接:https://github.com/chao2015/externalsort
源码分析:
1. channel通信
// 数据源来自于一个Array
func ArraySource(a ...int) <-chan int {
// 调用的真实情况是,函数新建一个channel并马上返回,并行的goroutine来进行发送数据的操作,发送完后记得close。
// func: 1.新建一个channel
out := make(chan int)
// go: 1.发送数据(channel是goroutine之间的通信管道)
go func() {
for _, v := range a {
out <- v
}
// go: 2.关闭channel,否则会报错:fatal error: all goroutines are asleep - deadlock!
close(out)
}()
// func: 2.返回这个channel
return out
}
func TestArraySource(t *testing.T) {
// 1. channel通信
p := pipeline.ArraySource(3, 2, 6, 7, 4)
// (1)
//for {
// if num, ok := <-p; ok {
// fmt.Println(num)
// } else {
// break
// }
//}
//(2) 简略写法
for v := range p {
fmt.Println(v)
}
}
Output:
3
2
6
7
4
2. 内部排序
var startTime time.Time
func Init() {
startTime = time.Now()
}
// 内部排序
func InMemSort(in <-chan int) <-chan int {
out := make(chan int, 1024)
go func() {
// Read into memory
a := []int{}
for v := range in {
a = append(a, v)
}
fmt.Println("Read done:", time.Now().Sub(startTime))
// Sort
sort.Ints(a)
fmt.Println("InMemSort done:", time.Now().Sub(startTime))
// Output
for _, v := range a {
out <- v
}
close(out)
}()
return out
}
func TestInMemSort(t *testing.T) {
// 2. 内部排序
p := pipeline.InMemSort(
pipeline.ArraySource(3, 2, 6, 7, 4))
for v := range p {
fmt.Println(v)
}
}
Output:
Read done: 20.008µs
InMemSort done: 62.384µs
2
3
4
6
7
3. 归并排序
func Merge(in1, in2 <-chan int) <-chan int {
out := make(chan int, 1024)
go func() {
v1, ok1 := <-in1
v2, ok2 := <-in2
for ok1 || ok2 {
if !ok2 || (ok1 && v1 <= v2) {
out <- v1
v1, ok1 = <-in1
} else {
out <- v2
v2, ok2 = <-in2
}
}
close(out)
fmt.Println("Merge done:", time.Now().Sub(startTime))
}()
return out
}
func TestMerge(t *testing.T) {
pipeline.Init()
// 3. 归并排序
p := pipeline.Merge(
pipeline.InMemSort(pipeline.ArraySource(3, 2, 6, 7, 4)),
pipeline.InMemSort(pipeline.ArraySource(7, 4, 0, 3, 2, 8, 13, 8)))
for v := range p {
fmt.Println(v)
}
}
Output:
Read done: 33.618µs
InMemSort done: 82.779µs
Read done: 60.004µs
InMemSort done: 122.588µs
Merge done: 133.939µs
0
2
2
3
3
4
4
6
7
7
8
13
4. 随机数生成函数
// 随机生成count个int型数据
func RandomSource(count int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < count; i++ {
out <- rand.Int()
}
close(out)
}()
return out
}
func TestRandomSource(t *testing.T) {
p := pipeline.RandomSource(10)
for v := range p {
fmt.Println(v)
}
}
Output:
5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
... ...
5. 读写数据
读写端要做到:
- buffer大小一致
- 大小端一致
// 读数据。第一个参数是读的来源对象,第二个参数是读取长度(-1全读)。输出是一个channel
func ReaderSource(reader io.Reader, chunkSize int) <-chan int {
out := make(chan int, 1024)
go func() {
// 64位系统的int型大小是64,所以用一个64位buffer = byte(8)*8
buffer := make([]byte, 8)
// 读取长度的控制变量
bytesRead := 0
for {
// n是读取的长度
n, err := reader.Read(buffer)
bytesRead += n
// 可能最后读取4字节数据,nil=EOF,所以要先读取数据,再判断nil
if n > 0 {
// 大端还是小端,发送和接收端统一即可
out <- int(binary.BigEndian.Uint64(buffer))
}
if err != nil ||
(chunkSize != -1 && bytesRead >= chunkSize) {
break
}
}
close(out)
}()
return out
}
// 写数据。第一个参数是写的目的对象,第二个参数是写的数据channel
func WriteSink(writer io.Writer, in <-chan int) {
for v := range in {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(v))
writer.Write(buffer)
}
}
func main() {
const filename = "small.in"
const n = 64
// 新建文件,返回可用的文件描述符
file, err := os.Create(filename)
if err != nil {
panic(err)
}
defer file.Close()
// 生成随机数
p := pipeline.RandomSource(n)
// 写数据到文件
// 包装文件描述符,使用缓存机制,提高读写速度
writer := bufio.NewWriter(file)
pipeline.WriteSink(writer, p)
writer.Flush() // 确保缓存数据全部写入
// 上面的文件描述符offset在末尾,不能用于读取
// 打开文件
file, err = os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
// 读取数据
p = pipeline.ReaderSource(bufio.NewReader(file), -1)
count := 0
for v := range p {
fmt.Println(v)
count++
if count >= 100 {
break
}
}
}
Output:
// 生成small.in数据文件,可以查看文件大小,以上代码生成64*8=512bit
5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
... ...
6. 单机版外部排序
- 前面4步就是demo测试,通过第5步生成small.in的待排序文件是必要的。
- 下面所要做的工作,就是从待排序文件中分块读取数据,块内排序后再归并排序,生成结果。
// 搭建归并节点组,递归调用实现2路归并
func MergeN(inputs ...<-chan int) <-chan int {
if len(inputs) == 1 {
return inputs[0]
}
m := len(inputs) / 2
// merge inputs[0..m) and inputs [m..end)
return Merge(
MergeN(inputs[:m]...),
MergeN(inputs[m:]...))
}
// pipeline的搭建及运行,单机上时,分块数(chunkCount)最好是cpu的核数
func createPipeline(
filename string,
fileSize, chunkCount int) <-chan int {
chunkSize := fileSize / chunkCount
pipeline.Init()
// 初始化结点组
sortResults := []<-chan int{}
for i := 0; i < chunkCount; i++ {
// 打开文件
file, err := os.Open(filename)
if err != nil {
panic(err)
}
// 设置offset
file.Seek(int64(i*chunkSize), 0)
// 读取数据
source := pipeline.ReaderSource(
bufio.NewReader(file), chunkSize)
// 内部排序后,追加到结点组中
sortResults = append(sortResults, pipeline.InMemSort(source))
}
// 归并结点组
return pipeline.MergeN(sortResults...)
}
func writeToFile(p <-chan int, filename string) {
// 创建文件
file, err := os.Create(filename)
if err != nil {
panic(err)
}
defer file.Close()
// 写入文件
writer := bufio.NewWriter(file)
defer writer.Flush()
pipeline.WriteSink(writer, p)
}
func printFile(filename string) {
// 打开文件
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
// 读取数据
p := pipeline.ReaderSource(file, -1)
count := 0
for v := range p {
fmt.Println(v)
count++
if count >= 100 {
break
}
}
}
func main() {
// small 512 4
// large 80000000 4
const filename_prefix = "small"
// 单机版
p := createPipeline(filename_prefix+".in", 512, 4) // 文件大小(512/80000000),读取块数(4)
writeToFile(p, filename_prefix+".out")
printFile(filename_prefix + ".out")
}
Output:
// 生成small.out文件
Read done: 357.557µs
InMemSort done: 646.607µs
Read done: 703.595µs
InMemSort done: 717.006µs
Read done: 752.546µs
InMemSort done: 768.127µs
Merge done: 809.478µs
Read done: 925.135µs
InMemSort done: 1.103011ms
Merge done: 1.149634ms
Merge done: 1.167648ms
Sorted:
261049867304784443
545291762129038907
605394647632969758
685213522303989579
732830328053361739
894385949183117216
... ...
7. 集群版外部排序
- 通过多端口之间的tcp通信来模拟集群。
// WriteSink的网络封装,写入数据
// 第一个参数是端口号,如":7000",第二个参数是传输的数据channel
func NetworkSink(addr string, in <-chan int) {
// 监听
listener, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
go func() {
defer listener.Close()
// 等待连接请求,返回下一个连接
conn, err := listener.Accept()
if err != nil {
panic(err)
}
defer conn.Close()
// 写入数据
writer := bufio.NewWriter(conn)
defer writer.Flush()
WriteSink(writer, in)
}()
}
// ReaderSource的网络封装,读取数据
// 唯一参数是端口号。输出一个channel
func NetworkSource(addr string) <-chan int {
out := make(chan int)
go func() {
// 连接
conn, err := net.Dial("tcp", addr)
if err != nil {
panic(err)
}
// 读取数据
r := ReaderSource(bufio.NewReader(conn), -1)
for v := range r {
out <- v
}
close(out)
}()
return out
}
func createNetworkPipeline(
filename string,
fileSize, chunkCount int) <-chan int {
chunkSize := fileSize / chunkCount
pipeline.Init()
// 初始化端口号组
sortAddr := []string{}
for i := 0; i < chunkCount; i++ {
// 打开文件
file, err := os.Open(filename)
if err != nil {
panic(err)
}
// 设置offset
file.Seek(int64(i*chunkSize), 0)
// 读取数据
source := pipeline.ReaderSource(bufio.NewReader(file), chunkSize)
// 端口号
addr := ":" + strconv.Itoa(7000+i)
// 起tcp服务
pipeline.NetworkSink(addr, pipeline.InMemSort(source))
// 追加到端口号组
sortAddr = append(sortAddr, addr)
}
// 初始化结点组
sortResults := []<-chan int{}
for _, addr := range sortAddr {
sortResults = append(sortResults, pipeline.NetworkSource(addr))
}
// 归并结点组
return pipeline.MergeN(sortResults...)
}
func main() {
// small 512 4
// large 80000000 4
const filename_prefix = "small"
// 网络版
p := createNetworkPipeline(filename_prefix+".in", 512, 4)
writeToFile(p, filename_prefix+".out")
printFile(filename_prefix + ".out")
}
Output:
Read done: 863.808µs
InMemSort done: 1.21919ms
Read done: 1.256505ms
InMemSort done: 1.262217ms
Read done: 1.233068ms
Read done: 1.309252ms
InMemSort done: 1.352371ms
InMemSort done: 1.313725ms
Merge done: 1.902537ms
Merge done: 2.043074ms
Merge done: 2.066381ms
Sorted:
261049867304784443
545291762129038907
605394647632969758
685213522303989579
732830328053361739
... ...
下一篇: iOS xib设置阴影
推荐阅读
-
Golang: 外部排序项目
-
Android项目类似淘宝 电商 搜索功能,监听软键盘搜索事件,延迟自动搜索,以及时间排序的搜索历史记录的实现
-
golang_切片排序:使用sort.Slice进行切片的排序
-
golang内置排序sort.Slice()
-
tomcat启动外部项目
-
HttpClient实现调用外部项目接口工具类的示例
-
JAVA项目更新打包部署之Eclipse外部工具升级版(Eclipse External Tools) 博客分类: JAVA
-
JAVA项目更新打包部署之Eclipse外部工具升级版(Eclipse External Tools) 博客分类: JAVA
-
php项目开发中用到的快速排序算法分析
-
maven项目引用外部jar包的方法