博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用 Kafka 和 MongoDB 进行 Go 异步处理
阅读量:6801 次
发布时间:2019-06-26

本文共 4324 字,大约阅读时间需要 14 分钟。

hot3.png

在这个示例中,我将数据的保存和 MongoDB 分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。

下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。

 

153916112292323660dbcdc

 

 

 

 

微服务 1 —— 是一个 REST 式微服务,它从一个 /POST http 调用中接收数据。接收到请求之后,它从 http 请求中检索数据,并将它保存到 Kafka。保存之后,它通过 /POST 发送相同的数据去响应调用者。

微服务 2 —— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 MongoDB 中。

我们开始吧!

首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例:

$ cd /
/kafka_2.11-1.1.0$ bin/zookeeper-server-start.sh config/zookeeper.properties

接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。

$ bin/kafka-server-start.sh config/server.properties

Kafka 跑起来之后,我们需要 MongoDB。它很简单,只需要使用这个 docker-compose.yml 即可。

version: '3'services:  mongodb:    image: mongo    ports:      - "27017:27017"    volumes:      - "mongodata:/data/db"    networks:      - network1volumes:   mongodata:networks:   network1:

使用 Docker Compose 去运行 MongoDB docker 容器。

docker-compose up

这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

func jobsPostHandler(w http.ResponseWriter, r *http.Request) {    //Retrieve body from http request    b, err := ioutil.ReadAll(r.Body)    defer r.Body.Close()    if err != nil {        panic(err)    }    //Save data into Job struct    var _job Job    err = json.Unmarshal(b, &_job)    if err != nil {        http.Error(w, err.Error(), 500)        return    }    saveJobToKafka(_job)    //Convert job struct into json    jsonString, err := json.Marshal(_job)    if err != nil {        http.Error(w, err.Error(), 500)        return    }    //Set content-type http header    w.Header().Set("content-type", "application/json")    //Send back data as response    w.Write(jsonString)}func saveJobToKafka(job Job) {    fmt.Println("save to kafka")    jsonString, err := json.Marshal(job)    jobString := string(jsonString)    fmt.Print(jobString)    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})    if err != nil {        panic(err)    }    // Produce messages to topic (asynchronously)    topic := "jobs-topic1"    for _, word := range []string{string(jobString)} {        p.Produce(&kafka.Message{            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},            Value:          []byte(word),        }, nil)    }}

这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据:

kafka-to-mongo/kafka-mongo-sample.go

func main() {    //Create MongoDB session    session := initialiseMongo()    mongoStore.session = session    receiveFromKafka()}func receiveFromKafka() {    fmt.Println("Start receiving from Kafka")    c, err := kafka.NewConsumer(&kafka.ConfigMap{        "bootstrap.servers": "localhost:9092",        "group.id":          "group-id-1",        "auto.offset.reset": "earliest",    })    if err != nil {        panic(err)    }    c.SubscribeTopics([]string{"jobs-topic1"}, nil)    for {        msg, err := c.ReadMessage(-1)        if err == nil {            fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))            job := string(msg.Value)            saveJobToMongo(job)        } else {            fmt.Printf("Consumer error: %v (%v)\n", err, msg)            break        }    }    c.Close()}func saveJobToMongo(jobString string) {    fmt.Println("Save to MongoDB")    col := mongoStore.session.DB(database).C(collection)    //Save data into Job struct    var _job Job    b := []byte(jobString)    err := json.Unmarshal(b, &_job)    if err != nil {        panic(err)    }    //Insert job into MongoDB    errMongo := col.Insert(_job)    if errMongo != nil {        panic(errMongo)    }    fmt.Printf("Saved to MongoDB : %s", jobString)}

我们来演示一下,运行微服务 1。确保 Kafka 已经运行了。

$ go run rest-kafka-sample.go

我使用 Postman 向微服务 1 发送数据。

 

15391611229382d2cb1b6c6

 

 

 

 

这里是日志,你可以在微服务 1 中看到。当你看到这些的时候,说明已经接收到了来自 Postman 发送的数据,并且已经保存到了 Kafka。

 

1539161123008bd3fbc4329

 

 

 

 

因为我们尚未运行微服务 2,数据被微服务 1 只保存在了 Kafka。我们来消费它并通过运行的微服务 2 来将它保存到 MongoDB。

$ go run kafka-mongo-sample.go

现在,你将在微服务 2 上看到消费的数据,并将它保存到了 MongoDB。

 

15391611231082453600024

 

 

 

 

检查一下数据是否保存到了 MongoDB。如果有数据,我们成功了!

 

153916112346570f66ff6ce

 

欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

 

转载于:https://my.oschina.net/u/3959468/blog/2240033

你可能感兴趣的文章
梦幻星空动画
查看>>
用Easing函数实现碰撞效果
查看>>
Python简介
查看>>
泛函编程(13)-无穷数据流-Infinite Stream
查看>>
XML与HTML
查看>>
[Java 泥水匠] Java Components 之二:算法篇之项目实践中的位运算符(有你不懂的哦)...
查看>>
[android]android自动化测试十之单元测试实例
查看>>
Java SecurityManager
查看>>
谁说阿里云不能跑Oracle,让驻云架构师告诉你怎么办!
查看>>
[LeetCode]*84.Largest Rectangle in Histogram
查看>>
[华为机试练习题]8.汽水瓶
查看>>
PostgreSQL 某单机插入性能测试 1200万行/s, 4.2GB/s
查看>>
taskset - retrieve or set a process's CPU affinity (affect SYSTEMTAP TIME)
查看>>
坏消息:Flutter官方暂时不会开发热更新(Code push)了。
查看>>
webpack4.x实战四,js和css独立打包
查看>>
数据一致性(一) - 接口调用一致性
查看>>
使用 core.js 解决 GraphQL Mock Server 跨域问题
查看>>
达文西,我要的是属性节点,不是属性!
查看>>
webpack4搭建的react全家桶example项目
查看>>
二十二、zookeeper实现分布式锁
查看>>