ReadDirectoryChangesW 停止处理大量文件

ReadDirectoryChangesW stops working on large amount of Files

提问人:Kevin 提问时间:11/17/2023 最后编辑:Kevin 更新时间:11/18/2023 访问量:86

问:

我正在将 与 一起使用,以便在将新文件添加到特定文件夹时接收通知。ReadDirectoryChangesWIOCompletionQueue

我目前面临的问题是,如果发生大规模的批量复制操作(大约 800 个文件),我的程序就会停止工作。

我调试了程序并认识到它在调用时总是失败。函数调用返回 ,但传入的变量设置为 。GetQueuedCompletionStatusTRUElpNumberOfBytesTransfered0

这是我使用的代码(我也愿意接受任何建议和帮助,因为我不是 Windows API 和 ReadDirectoryChangesW 方面的专家)

//FileIOAdapterImpl
HandlePtr FileIoAdapterImpl::CreateIoCompletionPortWrapper() {
  HandlePtr io_handle{ CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0) };
  return io_handle;
}


bool FileIoAdapterImpl::AssociateWithFileHandle(HANDLE directory, HANDLE existing_completion_port) {
  return CreateIoCompletionPort(directory, existing_completion_port, (ULONG_PTR)directory, 1);
}

bool FileIoAdapterImpl::GetQueuedCompletionStatusWrapper(HANDLE completion_token,
  LPDWORD bytes_transferred, PULONG_PTR completion_key, LPOVERLAPPED* overlap) {
  return GetQueuedCompletionStatus(completion_token, bytes_transferred, completion_key, overlap, 16);

}

HandlePtr FileIoAdapterImpl::CreateFileWrapper(const std::string& path) {
  HandlePtr dir_handle{
    CreateFileA(path.c_str(), FILE_LIST_DIRECTORY,
                FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL,
                OPEN_EXISTING,
                FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL)
  };

  return dir_handle;
}
bool FileIoAdapterImpl::ReadDirectoryChangesWrapper(HANDLE directory, std::vector<std::byte>& buffer, LPDWORD bytes_returned, OVERLAPPED* overlapped) {
  return ReadDirectoryChangesW(directory, buffer.data(), buffer.size(), FALSE, FILE_NOTIFY_CHANGE_FILE_NAME, bytes_returned, overlapped, nullptr);
}

这个类只是我使用的 WindowsAPI 函数的包装器(用于测试/模拟目的)。它只是一个带有自定义删除器来调用函数的删除器。HandlePtrunique_ptrCloseHandle

//ModernDirectoryWatcher.h
class ModernDirectoryWatcher {
public:
  using Callback = std::function<void(const std::string& filename)>;

  ModernDirectoryWatcher(const std::filesystem::path& input_directory, std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper = std::make_shared<FileIoAdapterImpl>());

  explicit operator bool() const {
    return is_valid_;
  }

  bool watch();

  void stop();

private:

  bool event_recv();
  bool event_send();
  void handle_events();
  bool has_event() const {
    return event_buf_len_ready_ != 0;
  }
  bool is_processable_file(const std::filesystem::path& path) const;

  Poco::Logger& logger_{ Poco::Logger::get("ModernDirectoryWatcher") };
  std::filesystem::path input_directory_;
  std::vector<std::string> file_types_{};
  Callback callback_;
  FileIOAdapterPtr win_io_api_wrapper_;
  HandlePtr path_handle_;
  HandlePtr event_completion_token_;
  unsigned long event_buf_len_ready_{ 0 };
  bool is_valid_{ false };
  OVERLAPPED event_overlap_{};
  std::vector<std::byte> event_buf_{ 64 * 1024 };
};
//ModernDirectoryWatcher.cpp
ModernDirectoryWatcher::ModernDirectoryWatcher(const std::filesystem::path& input_directory,
  std::vector<std::string> file_types, Callback callback, FileIOAdapterPtr io_wrapper)
  : input_directory_(input_directory),
  file_types_(std::move(file_types)),
  callback_(std::move(callback)),
  win_io_api_wrapper_(std::move(io_wrapper)) {

  path_handle_ = win_io_api_wrapper_->CreateFileWrapper(input_directory_.string());

  if (path_handle_.get() != INVALID_HANDLE_VALUE) {
    poco_information(logger_, "Create Completition Token");
    event_completion_token_ = win_io_api_wrapper_->CreateIoCompletionPortWrapper();
  }

  if (event_completion_token_.get() != nullptr) {
    poco_information(logger_, "Associate with the FileHandle");
    is_valid_ = win_io_api_wrapper_->AssociateWithFileHandle(path_handle_.get(), event_completion_token_.get());
  }
}

bool ModernDirectoryWatcher::watch() {
  poco_information(logger_, "Start Watching...");
  if (is_valid_) {
    poco_information(logger_, "Receive Events");
    event_recv();

    while (is_valid_ && has_event()) {
      poco_information(logger_, "There are still events to process...");
      event_send();
    }

    while (is_valid_) {
      ULONG_PTR completion_key{ 0 };
      LPOVERLAPPED overlap{ 0 };
      bool complete = win_io_api_wrapper_->GetQueuedCompletionStatusWrapper(event_completion_token_.get(), &event_buf_len_ready_, &completion_key, &overlap);
      if (complete && event_buf_len_ready_ == 0) {
        poco_error(logger_, "Error"); // HERE THE ERROR IS PRINTED 
      }
      if (complete && overlap) {
        poco_information(logger_, "Handle the events");
        handle_events();
      } else if (int err_code = GetLastError() != 258 && !complete) {
        poco_error(logger_, "Error");
      }
    }
    return true;
  } else {
    return false;
  }
}

void ModernDirectoryWatcher::handle_events() {
  while (is_valid_ && has_event()) {
    poco_information(logger_, "Send Event");
    event_send();
    poco_information(logger_, "Receive Events");
    event_recv();
  }
}

void ModernDirectoryWatcher::stop() {
  poco_notice(logger_, "Stop the Watcher");
  is_valid_ = false;
}

bool ModernDirectoryWatcher::event_recv() {
  event_buf_len_ready_ = 0;
  DWORD bytes_returned = 0;
  memset(&event_overlap_, 0, sizeof(OVERLAPPED));
  poco_information(logger_, "Call the ReadDirectoryChanges");
  auto read_ok = win_io_api_wrapper_->ReadDirectoryChangesWrapper(path_handle_.get(), event_buf_, &bytes_returned, &event_overlap_);

 
  if (!event_buf_.empty() && read_ok) {
    event_buf_len_ready_ = bytes_returned > 0 ? bytes_returned : 0;
    poco_information_f1(logger_, "Event Buffer Len: %?d", event_buf_len_ready_);
    return true;
  }

  if (GetLastError() == ERROR_IO_PENDING) {
    poco_error(logger_, "Error Pending IO Received stopping...");
    event_buf_len_ready_ = 0;
    is_valid_ = false;
  } else {
    poco_error_f1(logger_, "Error Code: %?d", GetLastError());
  }
  return false;
}

bool ModernDirectoryWatcher::event_send() {
  auto buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(event_buf_.data());
  if (is_valid_) {
    while (buf + sizeof(FILE_NOTIFY_INFORMATION) <= buf + event_buf_len_ready_) {
      poco_information(logger_, "Get valid Buffer...");
      auto filename = input_directory_ / std::wstring{ buf->FileName, buf->FileNameLength / 2 };
      if ((buf->Action == FILE_ACTION_ADDED || buf->Action == FILE_ACTION_RENAMED_NEW_NAME) && is_processable_file(filename)) {
        callback_(filename.string());
      }

      if (buf->NextEntryOffset == 0) {
        break;
      }

      buf = reinterpret_cast<FILE_NOTIFY_INFORMATION*>(reinterpret_cast<std::byte*>(buf) + buf->NextEntryOffset);

    }
    return true;
  } else {
    return false;
  }
}

bool ModernDirectoryWatcher::is_processable_file(const std::filesystem::path& path) const {
  std::string extension = path.extension().string().erase(0, 1);
  extension = Poco::toLower(extension);
  return std::find(file_types_.begin(), file_types_.end(), extension) != file_types_.end();
}

我将该观察程序作为应用程序中的线程启动,如下所示:

ModernDirectoryWatcher directory_watcher{dir, ftypes, [this](const std::string& filename) {...})}
std::thread watcher_thread = std::thread{&ModernDirectoryWatcher::watch, &directory_watcher}

如果我将更多的文件(~800)文件复制到监视的目录,代码会打印出错误,然后如果我再次添加文件(例如,只有一个文件),则无法再识别该文件。我知道发生这种情况是因为我检查函数是否大于,如果返回,则通常应该是什么情况。但是,如果我删除该检查,我就会开始从缓冲区中取出垃圾。has_eventevent_buf_len_ready_0GetQueuedCompletionStatusTRUE

编辑:

关于我在调试过程中看到的东西。如果我在 里面设置一个断点并查看结构,我看到该属性的值为 268。但是我做了一个快速,但它没有找到该号码的错误消息。if (complete && event_buf_len_ready_ == 0)OVERLAPPEDinternalnet helpmsg 268

C++ 多线程 WinAPI

评论

0赞 RbMm 11/17/2023
可能你得到了(这表示通知更改请求正在完成,并且信息没有在调用方的缓冲区中返回)。STATUS_NOTIFY_ENUM_DIRGetQueuedCompletionStatusBindIoCompletionCallback
0赞 RbMm 11/17/2023
寻找 stackoverflow.com/questions/14801950/...
0赞 Kevin 11/18/2023
@RbMm我认为这是与上述链接中相同的错误,但事实并非如此,我也编辑了问题以提及那里的信息STATUS_NOTIFY_ENUM_DIR
0赞 RbMm 11/18/2023
-不。正是这个值是 268STATUS_NOTIFY_ENUM_DIR
0赞 Kevin 11/18/2023
好的,是的,如果缓冲区溢出,那么手动检查文件夹是有意义的。

答:

1赞 MSalters 11/17/2023 #1

这是记录在案的行为。

When you first call ReadDirectoryChangesW, the system allocates a buffer to
store change information. This buffer is associated with the directory handle until
it is closed and its size does not change during its lifetime. Directory changes
that occur between calls to this function are added to the buffer and then
returned with the next call.

If the buffer overflows, ReadDirectoryChangesW will still return true, but the
entire contents of the buffer are discarded and the lpBytesReturned parameter
will be zero, which indicates that your buffer was too small to hold all of the
changes that occurred.

在这种情况下,您必须假设更改如此之大,以至于从头开始重新读取目录是更好的方法。

评论

0赞 Kevin 11/18/2023
我也考虑过这一点,但是我可以传递给的缓冲区的最大大小是多少?目前我创建这样的缓冲区:ReadDirectoryChangesWstd::vector<std::byte> buffer{64*1024}