提问人:TheCoder 提问时间:7/24/2023 最后编辑:Martin YorkTheCoder 更新时间:7/24/2023 访问量:49
如何在客户端异步读取服务器的数据(数据包)?
How can I asynchronously read the data(packet) from the server on the client side?
问:
我已经处理了大约 3 天的客户端和服务器代码的实现,我在 gRPC 协议中使用 c++ 语言编写了服务器代码,但问题是我无法读取异步发送的数据包中的数据(你可以把它想象成 10,000 个 4 字节的数据)与客户端的读取函数相同。代码如下。
非常感谢您的帮助。 亲切问候
服务器代码:
class ServerImpl final
{
public:
~ServerImpl()
{
server_->Shutdown();
pcq_->Shutdown();
}
void Run()
{
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service_);
pcq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
HandleRpcs();
}
private:
class CallData
{
public:
CallData(MultiGreeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service)
, cq_(cq)
, responder_(&ctx_)
, status_(CREATE)
, times_(0)
{
Proceed();
}
void Proceed()
{
if (status_ == CREATE)
{
status_ = PROCESS;
service_->RequestsayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
}
else if (status_ == PROCESS)
{
// Now that we go through this stage multiple times,
// we don't want to create a new instance every time.
// Refer to gRPC's original example if you don't understand
// why we create a new instance of CallData here.
if (times_ == 0)
{
new CallData(service_, cq_);
}
if (times_++ >= 3)
{
status_ = FINISH;
}
else
{
for (size_t i = 0; i < 65536; i++)
{
reply_.add_data(i*times_);
}
responder_.Write(reply_, this);
}
}
else if(status_ == FINISH)
{
status_ = DESTROY;
responder_.Finish(Status::OK, this);
}
else
{
GPR_ASSERT(status_ == DESTROY);
delete this;
}
}
private:
MultiGreeter::AsyncService* service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
FirstRequest request_;
FirstReply reply_;
ServerAsyncWriter<FirstReply> responder_;
int times_;
enum CallStatus
{
CREATE,
PROCESS,
FINISH,
DESTROY
};
CallStatus status_; // The current serving state.
};
void HandleRpcs()
{
new CallData(&service_, pcq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true)
{
GPR_ASSERT(pcq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> pcq_;
MultiGreeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
客户代码:
class GreeterClient
{
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(MultiGreeter::NewStub(channel))
{}
void SayHello(int32_t number)
{
FirstRequest request;
request.set_reqone(100);
auto* call = new AsyncClientCall;
call->FirstReader_ = stub_->PrepareAsyncsayHello(&call->Context_,request, &queue_);
call->FirstReader_ ->StartCall((void*)call);
call->FirstReader_ ->Finish(&call->status_,(void*)call);
/*ClientContext context;
std::unique_ptr<ClientReader<FirstReply>> reader(stub_->sayHello(&context, request));
FirstReply reply;
auto start = std::chrono::steady_clock::now();
int data = 0;
while (reader->Read(&reply))
{
// for (auto& val : reply.data())
// {
++data;
//ofs << "Got reply: " << val << std::endl;
//}
}
auto end = std::chrono::steady_clock::now();
std::cout << "[CLIENT TIME]: " << std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count() << " ms" << std::endl;
std::cout << "[DATA COUNT]:" << data << std::endl;
Status status = reader->Finish();
if (status.ok())
{
std::cout << "sayHello rpc succeeded." << std::endl;
}
else
{
std::cout << "sayHello rpc failed." << std::endl;
std::cout << status.error_code() << ": " << status.error_message() << std::endl;
}*/
}
void AsyncCompleteRPC()
{
void* tag;
bool ok = false;
std::cout << "queue_.Next(&tag, &ok) : " << queue_.Next(&tag, &ok) << std::endl;
while(queue_.Next(&tag, &ok))
{
std::cout << "step1\n";
if(!ok)
{
std::cerr << "Something went wrong " << std::endl;
}
long int datacount = 0;
auto* call = static_cast<AsyncClientCall*>(tag);
//call->FirstReader_ ->Read(&call->FirstReply_,(void*)call);
std::string err;
std::cout << "step2\n";
if (call)
{
std::cout << "step3\n";
if (call->status_.ok())
{
std::cout << "step4\n";
//std::cout << "Client received: " << call->FirstReply_.data(1) << std::endl;
std::cout << "my_tag : " << tag << std::endl;
std::cout << "static_cast<void*>(call->FirstReader_.get()) : " << static_cast<void*>(call->FirstReader_.get()) << std::endl;
if (tag == static_cast<void*>(call->FirstReader_.get()))
{
std::cout << "my_tag : " << my_tag << std::endl;
for (auto & val : call->FirstReply_.data())
{
++datacount;
std::cout << "[DATA] : " << val << std::endl;
}
call->FirstReader_->Read(&(call->FirstReply_),tag);
}
std::cout << "readed data count : " << datacount << std::endl;
} else {
std::cerr << call->status_.error_code() << ": " << call->status_.error_message() << std::endl;
std::cout << "Client received: RPC failed" << std::endl;
}
}
else
{
err = "A client call was deleted";
}
delete call;
if (!err.empty()) {
throw std::runtime_error(err);
}
}
}
private:
std::unique_ptr<MultiGreeter::Stub> stub_;
CompletionQueue queue_;
struct AsyncClientCall
{
ClientContext Context_;
FirstReply FirstReply_;
Status status_;
std::unique_ptr<ClientAsyncReader<FirstReply>> FirstReader_;
};
void* my_tag;
};
我尝试同步读取客户端,但没有完成队列,读取成功,但这次读取发生得太多,所以如果我发送 2 个数据包,它正在读取 3 个数据包。我不明白这个问题
答: 暂无答案
评论