You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
3.4 KiB

  1. // Copyright (c) 2020 Mobvoi Inc (Binbin Zhang)
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. #ifndef UTILS_BLOCKING_QUEUE_H_
  15. #define UTILS_BLOCKING_QUEUE_H_
  16. #include <condition_variable>
  17. #include <limits>
  18. #include <mutex>
  19. #include <queue>
  20. #include <utility>
  21. #include <vector>
  22. #include "wn_utils.h"
  23. namespace wenet {
  24. template <typename T>
  25. class BlockingQueue {
  26. public:
  27. explicit BlockingQueue(size_t capacity = std::numeric_limits<int>::max())
  28. : capacity_(capacity) {}
  29. void Push(const T& value) {
  30. {
  31. std::unique_lock<std::mutex> lock(mutex_);
  32. while (queue_.size() >= capacity_) {
  33. not_full_condition_.wait(lock);
  34. }
  35. queue_.push(value);
  36. }
  37. not_empty_condition_.notify_one();
  38. }
  39. void Push(T&& value) {
  40. {
  41. std::unique_lock<std::mutex> lock(mutex_);
  42. while (queue_.size() >= capacity_) {
  43. not_full_condition_.wait(lock);
  44. }
  45. queue_.push(std::move(value));
  46. }
  47. not_empty_condition_.notify_one();
  48. }
  49. void Push(const std::vector<T>& values) {
  50. {
  51. std::unique_lock<std::mutex> lock(mutex_);
  52. for (auto& value : values) {
  53. while (queue_.size() >= capacity_) {
  54. not_empty_condition_.notify_one();
  55. not_full_condition_.wait(lock);
  56. }
  57. queue_.push(value);
  58. }
  59. }
  60. not_empty_condition_.notify_one();
  61. }
  62. void Push(std::vector<T>&& values) {
  63. std::unique_lock<std::mutex> lock(mutex_);
  64. for (auto& value : values) {
  65. while (queue_.size() >= capacity_) {
  66. not_empty_condition_.notify_one();
  67. not_full_condition_.wait(lock);
  68. }
  69. queue_.push(std::move(value));
  70. }
  71. not_empty_condition_.notify_one();
  72. }
  73. T Pop() {
  74. std::unique_lock<std::mutex> lock(mutex_);
  75. while (queue_.empty()) {
  76. not_empty_condition_.wait(lock);
  77. }
  78. T t(std::move(queue_.front()));
  79. queue_.pop();
  80. not_full_condition_.notify_one();
  81. return t;
  82. }
  83. // num can be greater than capacity,but it needs to be used with care
  84. std::vector<T> Pop(size_t num) {
  85. std::unique_lock<std::mutex> lock(mutex_);
  86. std::vector<T> block_data;
  87. while (block_data.size() < num) {
  88. while (queue_.empty()) {
  89. not_full_condition_.notify_one();
  90. not_empty_condition_.wait(lock);
  91. }
  92. block_data.push_back(std::move(queue_.front()));
  93. queue_.pop();
  94. }
  95. not_full_condition_.notify_one();
  96. return block_data;
  97. }
  98. bool Empty() const {
  99. std::lock_guard<std::mutex> lock(mutex_);
  100. return queue_.empty();
  101. }
  102. size_t Size() const {
  103. std::lock_guard<std::mutex> lock(mutex_);
  104. return queue_.size();
  105. }
  106. void Clear() {
  107. while (!Empty()) {
  108. Pop();
  109. }
  110. }
  111. private:
  112. size_t capacity_;
  113. mutable std::mutex mutex_;
  114. std::condition_variable not_full_condition_;
  115. std::condition_variable not_empty_condition_;
  116. std::queue<T> queue_;
  117. public:
  118. WENET_DISALLOW_COPY_AND_ASSIGN(BlockingQueue);
  119. };
  120. } // namespace wenet
  121. #endif // UTILS_BLOCKING_QUEUE_H_