C++ 如何用百行代码实现线程安全的并发队列 | concurrent queue or blocking queue implemented in cpp

in 编程
关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9

本文首发于个人博客https://kezunlin.me/post/cabccf5c/,欢迎阅读最新内容!

concurrent queue or blocking queue implemented in cpp

<!--more-->

Guide

introduction

Where produce-consumer pattern is present it is often the case that one is faster that the other:

Producer and consumer often communicate by queues: the producer will put items on a queue while the consumer will pop items off a queue. What happens when the queue becomes full, or empty?

One approach of the producer is to try to put an item on a queue and if it’s full yield the thread and repeat. Similarly the consumer can try to pop an item off a queue and if it’s empty, ditto. This approach of try-fail-yield can unnecessarily burn CPU cycles in tight loops that constantly try to put or pop items off a queue.

Another approach is to temporarily grow the queue, but that doesn’t scale well. When do we stop growing? And once we stop we have to fall back onto the try-fail-yield method.

What if we could implement a blocking queue:

Quote from here

An example of using such a queue would look like this (notice a fast producer and slow consumer in the code below):

blocking queue v1

//std
#include <queue>

//boost
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>

namespace my { 
namespace algorithm {

template<typename Data>
class SHARED_EXPORT blocking_queue
{
private:
	std::queue<Data> the_queue;
	mutable boost::mutex the_mutex;
	boost::condition_variable the_condition_variable;

public:
	void push(Data const& data)
	{
		boost::mutex::scoped_lock lock(the_mutex);
		the_queue.push(data);
		lock.unlock();
		the_condition_variable.notify_one();
	}

	bool empty() const
	{
		boost::mutex::scoped_lock lock(the_mutex);
		return the_queue.empty();
	}

	size_t size() const
	{
		boost::mutex::scoped_lock lock(the_mutex);
		return the_queue.size();
	}

	bool try_pop(Data& popped_value)
	{
		boost::mutex::scoped_lock lock(the_mutex);
		if (the_queue.empty())
		{
			return false;
		}

		popped_value = the_queue.front();
		the_queue.pop();
		return true;
	}

	void wait_and_pop(Data& popped_value)
	{
		boost::mutex::scoped_lock lock(the_mutex);
		while (the_queue.empty())
		{
			the_condition_variable.wait(lock);
		}

		popped_value = the_queue.front();
		the_queue.pop();
	}

	void signal_exit()
	{
		Data data;
		push(data);
	}

};

}
}// end namespace

<script async src="https://pagead2.googlesyndication.com/pagead/js/adsbygoogle.js"></script>

<!-- kzl in-article ad -->

<ins

data-ad-layout="in-article"
data-ad-format="fluid"
data-ad-client="ca-pub-5653382914441020"
data-ad-slot="7925631830"></ins>

<script>
(adsbygoogle = window.adsbygoogle || []).push({});
</script>

blocking queue v2


#pragma once
#include <iostream>
#include <assert.h>	

#include <queue>
#include <mutex>
#include <condition_variable>

#define MAX_CAPACITY 20

namespace my {
namespace algorithm {

template<typename T>
class SHARED_EXPORT BlockingQueue
{
public:
	BlockingQueue() 
	:mtx(), full_(), empty_(), capacity_(MAX_CAPACITY) { }


	void Push(const T& data){
		std::unique_lock<std::mutex> lock(mtx);
		while(queue_.size() == capacity_){
			full_.wait(lock );
		}

		assert(queue_.size() < capacity_);
		queue_.push(data);
		empty_.notify_all(); 
	}

	T Pop(){
		std::unique_lock<std::mutex> lock(mtx);
		while(queue_.empty()){
			empty_.wait(lock );
		}

		assert(!queue_.empty());
		T front(queue_.front());
		queue_.pop();
		full_.notify_all();
		return front;
	}

	T Front(){
		std::unique_lock<std::mutex> lock(mtx);
		while(queue_.empty()){
			empty_.wait(lock );
		}

		assert(!queue_.empty());
		T front(queue_.front());
		return front;
	}

	T Back(){
		std::unique_lock<std::mutex> lock(mtx);
		while(queue_.empty()){
			empty_.wait(lock );
		}

		assert(!queue_.empty());
		T back(queue_.back());
		return back;
	}

	size_t Size(){
		std::lock_guard<std::mutex> lock(mtx);
		return queue_.size();
	}

	bool Empty(){
		std::unique_lock<std::mutex> lock(mtx);
		return queue_.empty();
	}

	void SetCapacity(const size_t capacity){
		capacity_ = (capacity > 0 ? capacity : MAX_CAPACITY);
	}

private:
	//DISABLE_COPY_AND_ASSIGN(BlockingQueue);
	BlockingQueue(const BlockingQueue& rhs);
	BlockingQueue& operator= (const BlockingQueue& rhs);

private:
	mutable std::mutex mtx;
	std::condition_variable full_;
	std::condition_variable empty_;
	std::queue<T> queue_;
	size_t capacity_; 
};


}
}// end namespace

Reference

History

Copyright

关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9
扫一扫关注公众号添加购物返利助手,领红包
Comments are closed.

推荐使用阿里云服务器

超多优惠券

服务器最低一折,一年不到100!

朕已阅去看看