基础工具之消息队列、线程池、缓冲区抽象、事件循环和日志实现

  正所谓“工欲善其事,必先利其器”, 咱们在实现通讯设计任务的过程当中须要一些基础工具来帮助咱们搭建部分基础组件,这些基础工具包括消息队列,线程池,缓冲区抽象,事件循环和日志工具。接下来对这部分基础工具进行描述和实现。linux

1. 消息队列

1.1 linux消息队列应用

(1)成员函数和数据结构  ios

struct msgbuf{
    long mtype;
    char mtext[1]; 
}

// msgid_ds内核数据结构
struct msgid_ds
// 生成惟一的键
key ftok(const char *pathname, int proj_id)

int msgget(key_t key, int msgflg)
msgflg是一个标志参数:
* IPC_CREATE 若是内核不存在与key相等的消息队列,则建立一个
   一个消息队列,若是存在这样的消息队列,返回该消息队列的描述符
* IPC_EXCL 和IPC_CREATE一块儿使用,若是对应键值的消息队列已
   经存在,则出错,返回-1

int msgsnd(int msgid, struct msgbuf* msgp. size_t msgsz, int magflg)
* 特别注意第三个参数,能够设置为0或IPC_NOWAIT, 当为0时,消息
    已满的时候会阻塞,若是为IPC_NOWAIT,则不等待当即返回,常见错
    误码有EAGAIN(消息队列已满),EIDRN(消息队列已被删除)
    EACCESS(没法访问消息对列)
int msgrcv(int msgid, struct msgbuff* , size_t msgsz, long int msgtype, int msgflag)
* msgflg 操做标志位,IPC_NOWAIT(若是没有知足条件的消息立马返回) IPC_EXCEPT 
(返回队列中第一个类型不会msgtype的消息),IPC_NOERROR(若是队列中知足条件的消息
内容大于所请求的实际字节数,则把该消息截断,截断部分将被丢弃)
int msgctl(int msgid, int cmd, struct msgid_ds*)
*cmd有如下三个参数:
*IPC_STAT(获取消息队列对应的msgid_ds数据结构)
*IPC_SET(设置消息队列的属性)
*IPC_RMID(从内核中删除msgid标识的消息队列)

(2)示例express

// linux系统的消息队列使用
#include "stdio.h"

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdint.h>
#include <errno.h>

#include <string>
#include <string.h>
#include <iostream>
using namespace std;

const int32_t BUFFSIZE = 256;
struct msgBuff
{
    long msgType;
    char buff[BUFFSIZE];
};

int main()
{
    int32_t proj_id = 32;
    key_t key = ftok("./messagekey", proj_id);
    if(-1 == key) 
    {
        cout << "ftok error" <<endl;
    }

    int msgid = msgget(key, IPC_CREAT);
    if (msgid == -1)
    {
        cout << "msgget error" << endl;
    }

    msgBuff msg;
    memset(&msg, 0, sizeof(msgBuff));
    msg.msgType = 3;
    strcpy(msg.buff, "message tset");
    cout << msg.buff << endl;
    int32_t msgLen = sizeof(msgBuff) - sizeof(long);
    cout << "msgLen:" << msgLen << endl;

    int nsize;
    if  ((nsize = msgsnd(msgid, &msg, msgLen, 0)) == -1){
        cout << strerror(errno) << endl;
    }
    
    return 0;
}
SendMsg.c
// linux系统的消息队列使用
#include "stdio.h"

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdint.h>
#include <errno.h>

#include <string>
#include <string.h>
#include <iostream>
using namespace std;

#include <thread>
#include <chrono>

const int32_t BUFFSIZE = 256;
struct msgBuff
{
    long msgType;
    char buff[BUFFSIZE];
};

int main()
{
    int32_t proj_id = 32;
    key_t key = ftok("./messagekey", proj_id);
    if(-1 == key) 
    {
        cout << "ftok error" <<endl;
    }

    int msgid = msgget(key, IPC_CREAT);
    if (msgid == -1)
    {
        cout << "msgget error" << endl;
    }

    msgBuff msg;
    memset(&msg, 0, sizeof(msgBuff));
    int32_t msgLen = sizeof(msgBuff) - sizeof(long);
    int nsize;
    while(1)
    {   
        if  (msgrcv(msgid, &msg, msgLen, 0, 0) == -1){
            cout << strerror(errno) << endl;
        }

        cout  << msg.msgType << "--" << msg.buff <<endl;
        std::this_thread::sleep_for(std::chrono::milliseconds (2000));
    }
    return 0;
}
RcvMsg.c

1.2 自定义消息队列实现

(1)类图apache

(2)代码实现安全

#pragma once
#include <stdint.h>
class Message
{
public:
    struct Type
    {
        enum{Stop = 0};
    };
    Message()
    {
        
    }
    Message(int32_t type) :_type(type)
    {

    }

    virtual ~Message()
    {

    }

    int32_t GetType() const
    {
        return _type;
    }

    void SetType(int32_t type)
    {
        _type = type;
    }
private:
    int32_t _type;
};
Message.h
/*
    @线程安全的消息队列
*/
#pragma once
#include <mutex>
#include <memory>
#include <queue>
#include <condition_variable>

#include <iostream>
using namespace std;

template <class MSG>
class MessageQueue
{
public:
    MessageQueue():_mutex(), _condition(), _queue(){}
    MessageQueue(const MessageQueue&) = delete;
    const MessageQueue& operator=(const MessageQueue&) = delete;
    virtual ~MessageQueue(){};

    void Push(const MSG& msg)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _queue.push(msg);
        cout << "add msg" << endl;
        _condition.notify_one();
    }
    bool Pop(MSG& msg, bool isBlocked = true) 
    {
        std::unique_lock<std::mutex> lock(_mutex);
        if(isBlocked)
        {
            while(_queue.empty())
            {
                cout << "block state, MessageQueue is empty,please wait..." << endl;
                _condition.wait(lock);
            }
        }
        else
        {
            if(IsEmpty()) return false;
        }

        msg = std::move(_queue.front());
        _queue.pop();

        return true;
    }
    bool IsEmpty()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.empty();
    }
    int32_t Size()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.size();
    }

private:
    std::mutex _mutex;
    std::condition_variable _condition;
    std::queue<MSG> _queue;
};
MessageQueue.h
注意:unique_lock和lock_guard的使用,unique_lock不像lock_guard只能在析构时才释放锁,它能够随时释放锁,所以在wait时让unique_lock释放锁从语义上看更加准确,其次要防止死锁状况的出现,不要锁里面再等待锁

2. 线程池

  很少说,直接上代码数据结构

#pragma once

#include <thread>
#include <functional>
#include <stdint.h>
#include <vector>
#include <iostream>
using namespace std;

#include "MessageQueue.h"

#define MIN_THREADS 3

template <typename Type>
class ThreadPool{
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

public:
    ThreadPool(int32_t threads, std::function<void(Type &record)> handler);
    virtual ~ThreadPool();
    void Submit(Type record);

private:
    bool _shutdown;
    int32_t _threads;
    std::function<void(Type &record)> _handler;

    std::vector<std::thread> _workers;
    MessageQueue<Type> _tasks;
};

template <typename Type>
ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler)
    :_shutdown(false), _threads(threads), _handler(handler), _workers(), _tasks()
{
    if(_threads < MIN_THREADS)
    {
        _threads = MIN_THREADS;
    }

    for(int32_t i = 0; i < _threads; i++)
    {
        std::thread workThread([this]{
            while(!_shutdown)
            {
                Type record;
                bool ret = _tasks.Pop(record, true);
                _handler(record);
            }
        });
        workThread.detach();
        _workers.emplace_back(std::move(workThread));
    }
}

template <typename Type>
ThreadPool<Type>::~ThreadPool()
{
    for(std::thread &worker : _workers)
    {
        worker.join();
    }
}

template <typename Type>
void ThreadPool<Type>::Submit(Type record)
{
    _tasks.Push(record);
}

3. 事件循环

  以日志工具为例,为了实现高性能的日志工具,咱们必须确保日志I/O所有处于一个独立线程,而不会影响后续的操做,所以,实际上日志记录就是其它线程向日志线程发送日志消息,这样一来,事件循环模型就变得很是必要。app

3.1 类图设计

3.2 关键代码实现

(1)ByteArrayless

#pragma once

#include <vector>
#include <string>
#include <cstring>

class ByteArray : public std::vector<char> {
    public:
        ByteArray() = default;

        ByteArray(int32_t size) :
                std::vector<char>(size) {
        }

        ByteArray(const char *buffer, int32_t size) :
                std::vector<char>(buffer, buffer + size) {
        }

        ByteArray(const std::string &str) :
                std::vector<char>(str.size()) {
            memcpy(data(), str.c_str(), str.size());
        }

        std::string ToStdString() const {
            std::string result(this->cbegin(), this->cend());

            return result;
        }

        ByteArray &Concat(const ByteArray &buffer2) {
            size_t oldSize = size();
            size_t newSize = oldSize + buffer2.size();
            resize(newSize);
            memcpy(this->data() + oldSize, buffer2.data(), buffer2.size());

            return *this;
        }

        ByteArray operator+(const ByteArray &buffer2) const {
            ByteArray buffer1(this->size() + buffer2.size());
            memcpy(buffer1.data(), this->data(), this->size());
            memcpy(buffer1.data() + this->size(), buffer2.data(), buffer2.size());

            return buffer1;
        }
    };
View Code

(2)IStreamide

#pragma once

#include "ByteArray.h"

#include <functional>

class IStream {
    public:
        typedef std::function<void(const char* buf, int64_t size)> DataIndicationHandler;

        virtual int32_t Receive(char* buffer, int32_t bufferSize, int32_t& readSize) = 0;
        virtual int32_t Send(const ByteArray& byteArray) = 0;

        virtual void OnDataIndication(DataIndicationHandler handler) = 0;
        virtual DataIndicationHandler GetDataIndication() = 0;
    };

(3) BaseEvent函数

#pragma once
#include "ByteArray.h"
#include "IStream.h"

class BaseEvent
    {
    public:
        BaseEvent() { }

        BaseEvent(const std::string &type, const ByteArray &data,
                  IStream *stream) :
                _type(type), _data(data), _stream(stream)
        {
        }

        void SetData(const ByteArray &data)
        {
            _data = data;
        }

        const ByteArray &GetData() const
        {
            return _data;
        }

        void SetType(const std::string &type)
        {
            _type = type;
        }

        const std::string &GetType() const
        {
            return _type;
        }

        void SetStream(IStream *stream)
        {
            _stream = stream;
        }

        IStream *GetStream() const
        {
            return _stream;
        }

    private:
        std::string _type;
        ByteArray _data;
        IStream* _stream;
    };

(4)EventQueue

#pragma once

#include "BaseEvent.h"

#include <memory>
#include <mutex>
#include <condition_variable>
#include <chrono>

class EventQueue 
{
public:
    EventQueue(int timeout = 0) : _timeout(timeout) { }

    void PostEvent(BaseEvent *event)
    {
        std::unique_lock <std::mutex> locker(_mutex);

        _events.push_back(std::shared_ptr<BaseEvent>(event));
    }

    std::shared_ptr <BaseEvent> GetEvent()
    {
        std::unique_lock <std::mutex> locker(_mutex);

        if (_events.empty())
        {
            if (_timeout == 0)
            {
                return nullptr;
            }

            _waitCondition.wait_for(locker, std::chrono::milliseconds(_timeout));
        }

        if (!_events.empty())
        {
            std::shared_ptr <BaseEvent> event = _events.front();
            _events.erase(_events.begin());

            return event;
        }

        return nullptr;
    }

private:
    std::vector <std::shared_ptr<BaseEvent>> _events;
    std::mutex _mutex;
    std::condition_variable _waitCondition;
    // ms
    int _timeout;
};

(5)Loop

#pragma once

class Loop
{
public:
    void Start()
    {
        _Run();
    }

    virtual ~Loop(){}
private:
    virtual void _Run() = 0;
}

(6)EventQueueLoop

#pragma once
#include "Loop.h"
#include "EventQueue.h"
#include <memory>

class EventQueueLoop : public Loop
    {
    public:
        EventQueueLoop(EventQueue *queue);

        
    protected:
        virtual void _Run();

        virtual void OnEvent(std::shared_ptr <BaseEvent> event) = 0;

    private:
        EventQueue *_queue;
    };
#include "EventQueueLoop.h"

EventQueueLoop::EventQueueLoop(EventQueue *queue) : _queue(queue) 
{
}

void EventQueueLoop::_Run() 
{
    while (true) {
        std::shared_ptr <BaseEvent> event = _queue->GetEvent();
        if (!event) {
            continue;
        }

        OnEvent(event);
    }
}

4. 基于消息队列的日志实现

4.1 日志优先级

const std::string PRIORITY_STRING[] =
{
     "DEBUG",
     "CONFIG",
     "INFO",
     "WARNING",
     "ERROR"
};

4.2 日志格式

void Logger::WriteLog(Priority priority, const std::string &log) {
        if (priority < _priority)
            return;

        std::stringstream stream;
        stream << HurricaneUtils::GetCurrentTimeStamp()
        << " [" << PRIORITY_STRING[priority] << "] "
        << log;

        _queue.Push(stream.str());
    }
std::string GetCurrentTimeStamp()
{
    // get current time
    auto currentTime = std::chrono::system_clock::now();
    // get milliseconds
    auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000;

    auto currentTimePoint = std::chrono::system_clock::to_time_t(currentTime);

    // output the time stamp
    std::ostringstream stream;

#if (defined(WIN32) || defined(_WIN32) || defined(__WIN32__)) && !defined(__MINGW32__)
    stream << std::put_time(std::localtime(&currentTimePoint), "%T");
#else
    char buffer[80];
    auto success = std::strftime(buffer, 80, "%T", std::localtime(&currentTimePoint));  // %T显示时分秒
    assert(0 != success);
    stream << buffer;
#endif

    stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count();

    return stream.str();
}

4.3 代码实现

/**
 * licensed to the apache software foundation (asf) under one
 * or more contributor license agreements.  see the notice file
 * distributed with this work for additional information
 * regarding copyright ownership.  the asf licenses this file
 * to you under the apache license, version 2.0 (the
 * "license"); you may not use this file except in compliance
 * with the license.  you may obtain a copy of the license at
 *
 * http://www.apache.org/licenses/license-2.0
 *
 * unless required by applicable law or agreed to in writing, software
 * distributed under the license is distributed on an "as is" basis,
 * without warranties or conditions of any kind, either express or implied.
 * see the license for the specific language governing permissions and
 * limitations under the license.
 */
#pragma once

#include "MessageQueue.h"
#include <memory>
#include <thread>
#include <queue>
#include <string>
#include <fstream>

enum Priority {
        DEBUG,
        STATE,
        INFO,
        WARNING,
        FAULT
    };

class Logger {
    Logger &operator=(const Logger &) = delete;

    Logger(const Logger &other) = delete;

public:
    static Logger *Get();

    void SetPriority(Priority priority);

    Priority GetPriority();

    void WriteLog(Priority priority, const std::string &log);

private:
    Logger(Priority priority);

    virtual ~Logger();

    void _InitializeFileStream();

    void _WriteThread();

    std::string GetCurrentTimeStamp();

private:
    MessageQueue <std::string> _queue;
    std::ofstream *_fileStream;
    Priority _priority;
    bool _shutdown;
};

#define TRACE_DEBUG(LOG_CONTENT) Logger::Get()->WriteLog(DEBUG, LOG_CONTENT);
#define TRACE_STATE(LOG_CONTENT) Logger::Get()->WriteLog(STATE, LOG_CONTENT);
#define TRACE_INFO(LOG_CONTENT) Logger::Get()->WriteLog(INFO, LOG_CONTENT);
#define TRACE_WARNING(LOG_CONTENT) Logger::Get()->WriteLog(WARNING, LOG_CONTENT);
#define TRACE_ERROR(LOG_CONTENT) Logger::Get()->WriteLog(FAULT, LOG_CONTENT);
Logger.h
/**
 * licensed to the apache software foundation (asf) under one
 * or more contributor license agreements.  see the notice file
 * distributed with this work for additional information
 * regarding copyright ownership.  the asf licenses this file
 * to you under the apache license, version 2.0 (the
 * "license"); you may not use this file except in compliance
 * with the license.  you may obtain a copy of the license at
 *
 * http://www.apache.org/licenses/license-2.0
 *
 * unless required by applicable law or agreed to in writing, software
 * distributed under the license is distributed on an "as is" basis,
 * without warranties or conditions of any kind, either express or implied.
 * see the license for the specific language governing permissions and
 * limitations under the license.
 */

#include "logger.h"
#include <iostream>
#include <sstream>
#include <iomanip>

const std::string PRIORITY_STRING[] =
{
    "DEBUG",
    "CONFIG",
    "INFO",
    "WARNING",
    "ERROR"
};

Logger *Logger::Get() {
    static Logger logger(DEBUG);
    return &logger;
}

Logger::Logger(Priority priority) : _queue(), _fileStream(nullptr), _shutdown(false) {
    _priority = priority;
    _InitializeFileStream();
    auto func = std::bind(&Logger::_WriteThread, this);
    std::thread writeThread(func);
    writeThread.detach();
}

Logger::~Logger() {
    _shutdown = true;

    if (nullptr != _fileStream) {
        _fileStream->close();
        delete _fileStream;
        _fileStream = nullptr;
    }
}

std::string Logger::GetCurrentTimeStamp()
{
    // get current time
    auto currentTime = std::chrono::system_clock::now();
    // get milliseconds
    auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime.time_since_epoch()) % 1000;

    auto currentTimePoint = std::chrono::system_clock::to_time_t(currentTime);

    // output the time stamp
    std::ostringstream stream;

    char buffer[80];
    auto success = std::strftime(buffer, 80, "%T", std::localtime(&currentTimePoint));  // %T显示时分秒
    stream << buffer;
    //assert(0 != success);
    //stream << '.' << std::setfill('0') << std::setw(3) << milliseconds.count();

    return stream.str();
}

void Logger::SetPriority(Priority priority) {
    _priority = priority;
}

Priority Logger::GetPriority() {
    return _priority;
}

void Logger::_InitializeFileStream() {
    // Prepare fileName
    std::string fileName = "./logger.log";

    // Initialize file stream
    _fileStream = new std::ofstream();
    std::ios_base::openmode mode = std::ios_base::out;
    mode |= std::ios_base::trunc;
    _fileStream->open(fileName, mode);

    // Error handling
    if (!_fileStream->is_open()) {
        // Print error information
        std::ostringstream ss_error;
        ss_error << "FATAL ERROR:  could not Open log file: [" << fileName << "]";
        ss_error << "\n\t\t std::ios_base state = " << _fileStream->rdstate();
        std::cerr << ss_error.str().c_str() << std::endl << std::flush;

        // Cleanup
        _fileStream->close();
        delete _fileStream;
        _fileStream = nullptr;
    }
}

void Logger::WriteLog(Priority priority, const std::string &log) {
    if (priority < _priority)
        return;
    std::stringstream stream;
    stream << GetCurrentTimeStamp()
    << " [" << PRIORITY_STRING[priority] << "] "
    << log;

    _queue.Push(stream.str());
}

void Logger::_WriteThread() {
    while (!_shutdown) {
        std::string log;
        _queue.Pop(log, true);

        //std::cout << log << std::endl;

        if (_fileStream)
            *_fileStream << log << std::endl;
    }
}
Logger.cpp

 测试:

#include "stdio.h"

#include <thread>
#include <chrono>

#include "../logger.h"
int main()
{
    TRACE_DEBUG("Logger Debug test");

    while(1)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
    return 0;
}