如何在客户端异步读取服务器的数据(数据包)?

How can I asynchronously read the data(packet) from the server on the client side?

提问人:TheCoder 提问时间:7/24/2023 最后编辑:Martin YorkTheCoder 更新时间:7/24/2023 访问量:49

问:

我已经处理了大约 3 天的客户端和服务器代码的实现,我在 gRPC 协议中使用 c++ 语言编写了服务器代码,但问题是我无法读取异步发送的数据包中的数据(你可以把它想象成 10,000 个 4 字节的数据)与客户端的读取函数相同。代码如下。

非常感谢您的帮助。 亲切问候

服务器代码:

class ServerImpl final
{
public:
    ~ServerImpl()
    {
        server_->Shutdown();
        pcq_->Shutdown();
    }

    void Run()
    {
        std::string server_address("0.0.0.0:50051");

        ServerBuilder builder;
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service_);

        pcq_ = builder.AddCompletionQueue();
        server_ = builder.BuildAndStart();
        std::cout << "Server listening on " << server_address << std::endl;

        HandleRpcs();
    }

private:
    class CallData
    {
    public:
        CallData(MultiGreeter::AsyncService* service, ServerCompletionQueue* cq)
            : service_(service)
            , cq_(cq)
            , responder_(&ctx_)
            , status_(CREATE)
            , times_(0)
        {
            Proceed();
        }

        void Proceed()
        {
            if (status_ == CREATE)
            {
                status_ = PROCESS;
                service_->RequestsayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
            }
            else if (status_ == PROCESS)
            {
                // Now that we go through this stage multiple times, 
                // we don't want to create a new instance every time.
                // Refer to gRPC's original example if you don't understand 
                // why we create a new instance of CallData here.

                if (times_ == 0)
                {
                    new CallData(service_, cq_);
                }

                if (times_++ >= 3)
                {
                    status_ = FINISH;
                }
                else
                {
                    for (size_t i = 0; i < 65536; i++)
                    {
                        reply_.add_data(i*times_);
                    }
                    responder_.Write(reply_, this);
                }
            }

            else if(status_ == FINISH)
            {
                status_ = DESTROY;
                responder_.Finish(Status::OK, this);
            }

            else
            {
                GPR_ASSERT(status_ == DESTROY);
                delete this;
            }
        }

    private:
        MultiGreeter::AsyncService* service_;
        ServerCompletionQueue* cq_;
        ServerContext ctx_;
        FirstRequest request_;
        FirstReply reply_;
        ServerAsyncWriter<FirstReply> responder_;

        int times_;

        enum CallStatus
        {
            CREATE,
            PROCESS,
            FINISH,
            DESTROY
        };
        CallStatus status_; // The current serving state.
    };

    void HandleRpcs()
    {
        new CallData(&service_, pcq_.get());
        void* tag; // uniquely identifies a request.
        bool ok;


        while (true)
        {
            GPR_ASSERT(pcq_->Next(&tag, &ok));
            GPR_ASSERT(ok);
            static_cast<CallData*>(tag)->Proceed();
        }
    }

    std::unique_ptr<ServerCompletionQueue> pcq_;
    MultiGreeter::AsyncService service_;
    std::unique_ptr<Server> server_;
};

客户代码:

class GreeterClient
{
public:
    GreeterClient(std::shared_ptr<Channel> channel)
        : stub_(MultiGreeter::NewStub(channel))
    {}

    void SayHello(int32_t number)
    {
        FirstRequest request;
        request.set_reqone(100);

        auto* call = new AsyncClientCall;
        call->FirstReader_ = stub_->PrepareAsyncsayHello(&call->Context_,request, &queue_);
        call->FirstReader_ ->StartCall((void*)call);        
        call->FirstReader_ ->Finish(&call->status_,(void*)call);
        
        /*ClientContext context;
        std::unique_ptr<ClientReader<FirstReply>> reader(stub_->sayHello(&context, request));

        FirstReply reply;
        auto start = std::chrono::steady_clock::now();
        int data = 0;

        while (reader->Read(&reply)) 
        {
           // for (auto& val : reply.data())
           // {
                ++data;
                //ofs << "Got reply: " << val << std::endl;
            //}        
        }

        auto end = std::chrono::steady_clock::now();
        std::cout << "[CLIENT TIME]: " << std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count() << " ms" << std::endl;
        std::cout << "[DATA COUNT]:" << data << std::endl;

        Status status = reader->Finish();
        if (status.ok()) 
        {
            std::cout << "sayHello rpc succeeded." << std::endl;
        } 
        else 
        {
            std::cout << "sayHello rpc failed." << std::endl;
            std::cout << status.error_code() << ": " << status.error_message() << std::endl;
        }*/

    }


void AsyncCompleteRPC()
{
    void* tag;
    bool ok = false;
    std::cout << "queue_.Next(&tag, &ok) : " << queue_.Next(&tag, &ok) << std::endl;
    while(queue_.Next(&tag, &ok))
    {
        std::cout << "step1\n";
        if(!ok)
        {
            std::cerr << "Something went wrong " << std::endl;
        }
        long int datacount = 0;
        auto* call = static_cast<AsyncClientCall*>(tag);
        //call->FirstReader_ ->Read(&call->FirstReply_,(void*)call);
        
        std::string err;
        std::cout << "step2\n";
        if (call) 
        {
        std::cout << "step3\n";


            if (call->status_.ok()) 
            {
                
        std::cout << "step4\n";

                //std::cout << "Client received: " << call->FirstReply_.data(1) << std::endl;
                    std::cout << "my_tag : " << tag << std::endl;
                    std::cout << "static_cast<void*>(call->FirstReader_.get()) : " << static_cast<void*>(call->FirstReader_.get()) << std::endl;
                if (tag == static_cast<void*>(call->FirstReader_.get())) 
                {
                    std::cout << "my_tag : " << my_tag << std::endl;
                    for (auto & val : call->FirstReply_.data())
                    {
                        ++datacount;   
                        std::cout <<  "[DATA] : " << val << std::endl;
                    }
                    call->FirstReader_->Read(&(call->FirstReply_),tag);
                }

                std::cout << "readed data count : " << datacount << std::endl;
                                        
                
            } else {
                std::cerr << call->status_.error_code() << ": " << call->status_.error_message() << std::endl;
                std::cout << "Client received: RPC failed" << std::endl;
            }
        } 
        else 
        {
            err = "A client call was deleted";
        }

        delete call;

        if (!err.empty()) {
            throw std::runtime_error(err);
        }
    }
    }

   
private:
    std::unique_ptr<MultiGreeter::Stub> stub_;
    CompletionQueue  queue_;
    struct AsyncClientCall
    {
        ClientContext Context_;
        FirstReply FirstReply_;
        Status status_;
        std::unique_ptr<ClientAsyncReader<FirstReply>>  FirstReader_;
    };

    void* my_tag;
};

我尝试同步读取客户端,但没有完成队列,读取成功,但这次读取发生得太多,所以如果我发送 2 个数据包,它正在读取 3 个数据包。我不明白这个问题

C++ 异步 客户端-服务器 gRPC StreamReader

评论

0赞 TheCoder 9/27/2023
我解决了这个问题。在代码行responder_后,解决方案在服务器端紧随其后。写(reply_,这个);ı 应该添加 reply_.clear_data(),因为变量是一个数组,在发送每个数据包之前,应该删除数组的元素。

答: 暂无答案