// Copyright (c) 2020 Mobvoi Inc (Binbin Zhang) // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef UTILS_BLOCKING_QUEUE_H_ #define UTILS_BLOCKING_QUEUE_H_ #include #include #include #include #include #include #include "wn_utils.h" namespace wenet { template class BlockingQueue { public: explicit BlockingQueue(size_t capacity = std::numeric_limits::max()) : capacity_(capacity) {} void Push(const T& value) { { std::unique_lock lock(mutex_); while (queue_.size() >= capacity_) { not_full_condition_.wait(lock); } queue_.push(value); } not_empty_condition_.notify_one(); } void Push(T&& value) { { std::unique_lock lock(mutex_); while (queue_.size() >= capacity_) { not_full_condition_.wait(lock); } queue_.push(std::move(value)); } not_empty_condition_.notify_one(); } void Push(const std::vector& values) { { std::unique_lock lock(mutex_); for (auto& value : values) { while (queue_.size() >= capacity_) { not_empty_condition_.notify_one(); not_full_condition_.wait(lock); } queue_.push(value); } } not_empty_condition_.notify_one(); } void Push(std::vector&& values) { std::unique_lock lock(mutex_); for (auto& value : values) { while (queue_.size() >= capacity_) { not_empty_condition_.notify_one(); not_full_condition_.wait(lock); } queue_.push(std::move(value)); } not_empty_condition_.notify_one(); } T Pop() { std::unique_lock lock(mutex_); while (queue_.empty()) { not_empty_condition_.wait(lock); } T t(std::move(queue_.front())); queue_.pop(); not_full_condition_.notify_one(); return t; } // num can be greater than capacity,but it needs to be used with care std::vector Pop(size_t num) { std::unique_lock lock(mutex_); std::vector block_data; while (block_data.size() < num) { while (queue_.empty()) { not_full_condition_.notify_one(); not_empty_condition_.wait(lock); } block_data.push_back(std::move(queue_.front())); queue_.pop(); } not_full_condition_.notify_one(); return block_data; } bool Empty() const { std::lock_guard lock(mutex_); return queue_.empty(); } size_t Size() const { std::lock_guard lock(mutex_); return queue_.size(); } void Clear() { while (!Empty()) { Pop(); } } private: size_t capacity_; mutable std::mutex mutex_; std::condition_variable not_full_condition_; std::condition_variable not_empty_condition_; std::queue queue_; public: WENET_DISALLOW_COPY_AND_ASSIGN(BlockingQueue); }; } // namespace wenet #endif // UTILS_BLOCKING_QUEUE_H_