因为工做的需求,后续笔者工做须要和开源的OLAP数据库ClickHouse打交道。ClickHouse是Yandex在2016年6月15日开源了一个分析型数据库,以强悍的单机处理能力被称道。
笔者在实际测试ClickHouse和阅读ClickHouse的源码过程之中,对"战斗民族"开发的数据库十分欣赏。ClickHouse不只是一个很好的数据库学习材料,并且同时应用了大量的CPP17的新特性进行开发,也是一个大型的Modern CPP的教导资料。
笔者接下来会陆续将阅读ClickHouse的部分心得体会与经过源码阅读笔记的方式和你们分享,坦白说,这种源码阅读笔记很难写啊。(多一分繁琐,少一分就模糊了~~)
第一篇文章,咱们就从聚合函数的实现开始聊起~~ 上车!git
聚合函数: 顾名思义就是对一组数据执行聚合计算并返回结果的函数。
这类函数在数据库之中很常见,如:count, max, min, sum
等等。github
/** Adds a value into aggregation data on which place points to. * columns points to columns containing arguments of aggregation function. * row_num is number of row which should be added. * Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation. */ virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0; /// Merges state (on which place points to) with other state of current aggregation function. virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0; /// Serializes state (to transmit it over the network, for example). virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0; /// Deserializes state. This function is called only for empty (just created) states. virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0; // /** Contains a loop with calls to "add" function. You can collect arguments into array "places" * and do a single call to "addBatch" for devirtualization and inlining. */ virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;
ColumnUInt八、ColumnArray
等, 都实现了对应的列接口,而且在子类之中具象实现了不一样的内存布局。columns
是一个二维数组,经过columns[0]
能够取到第一列。(这里只有涉及到一列,为何columns是二维数组呢?由于处理array等列的时候,也是经过对应的接口,而array就须要应用二维数组了. )IColumn
子类实现的getData方法
获取对应row_num
行的数据进行add函数调用就完成了一次聚合函数的计算了。void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override { const auto & column = static_cast<const ColVecType &>(*columns[0]); this->data(place).add(column.getData()[row_num]); }
addFree
就实现了我上述所说的过程,可是它是一个private的函数,因此一般咱们都是经过getAddressOfAddFunction
获取对应的函数地址。这在聚合查询的过程之中可以提升20%左右的执行效率。template <typename Derived> class IAggregateFunctionHelper : public IAggregateFunction { private: static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) { static_cast<const Derived &>(*that).add(place, columns, row_num, arena); } public: IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_) : IAggregateFunction(argument_types_, parameters_) {} AddFunc getAddressOfAddFunction() const override { return &addFree; }
class AggregateFunctionFactory final : private boost::noncopyable, public IFactoryWithAliases<AggregateFunctionCreator> { public: static AggregateFunctionFactory & instance(); /// Register a function by its name. /// No locking, you must register all functions before usage of get. void registerFunction( const String & name, Creator creator, CaseSensitiveness case_sensitiveness = CaseSensitive); /// Throws an exception if not found. AggregateFunctionPtr get( const String & name, const DataTypes & argument_types, const Array & parameters = {}, int recursion_level = 0) const;
有了上述的背景知识,咱们接下来举个栗子。来看看一个聚合函数的实现细节,以及它是如何被使用的。算法
笔者这里选取了一个很简单的聚合算子Sum,咱们来看看它实现的代码细节。
这里咱们能够看到AggregateFunctionSum
是个final类,没法被继承了。而它继承了上面提到的IAggregateFunctionHelp
类的子类IAggregateFunctionDataHelper
类。数据库
这里咱们就重点看,这个类override了getName
方法,返回了对应的名字sum。而且实现了咱们上文提到的四个核心的方法。数组
template <typename T, typename TResult, typename Data> class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>> { public: using ResultDataType = std::conditional_t<IsDecimalNumber<T>, DataTypeDecimal<TResult>, DataTypeNumber<TResult>>; using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>; using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<TResult>, ColumnVector<TResult>>; String getName() const override { return "sum"; } AggregateFunctionSum(const DataTypes & argument_types_) : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {}) , scale(0) {} AggregateFunctionSum(const IDataType & data_type, const DataTypes & argument_types_) : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {}) , scale(getDecimalScale(data_type)) {} DataTypePtr getReturnType() const override { if constexpr (IsDecimalNumber<T>) return std::make_shared<ResultDataType>(ResultDataType::maxPrecision(), scale); else return std::make_shared<ResultDataType>(); } void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override { const auto & column = static_cast<const ColVecType &>(*columns[0]); this->data(place).add(column.getData()[row_num]); } void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); } void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).write(buf); } void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).read(buf); } void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override { auto & column = static_cast<ColVecResult &>(to); column.getData().push_back(this->data(place).get()); } private: UInt32 scale; };
接下来,ClickHouse实现了两种聚合计算:AggregateFunctionSumData
和AggregateFunctionSumKahanData
。后者是用Kahan算法避免float类型精度损失的,咱们能够暂时不细看。直接看SumData的实现。这是个模板类,以前咱们讲到AggregateFunction
的函数就是经过AggregateDataPtr
指针来获取AggregateFunctionSumData
的地址,来调用add实现聚合算子的。咱们能够看到AggregateFunctionSumData实现了前文提到的add, merge, write,read
四大方法,正好和接口一一对应上了。数据结构
template <typename T> struct AggregateFunctionSumData { T sum{}; void add(T value) { sum += value; } void merge(const AggregateFunctionSumData & rhs) { sum += rhs.sum; } void write(WriteBuffer & buf) const { writeBinary(sum, buf); } void read(ReadBuffer & buf) { readBinary(sum, buf); } T get() const { return sum; } };
ClickHouse在Server启动时。main函数之中会调用registerAggregateFunction
的初始化函数注册全部的聚合函数。
而后调用到下面的函数:并发
void registerAggregateFunctionSum(AggregateFunctionFactory & factory) { factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive); factory.registerFunction("sumWithOverflow", createAggregateFunctionSum<AggregateFunctionSumWithOverflow>); factory.registerFunction("sumKahan", createAggregateFunctionSum<AggregateFunctionSumKahan>); }
这里又调用了 factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive);
来进行上述咱们看到的聚合函数的注册。这里有一点很恶心的模板代码,笔者这里简化了一下,把注册的部分函数拉出来:分布式
createAggregateFunctionSum(const std::string & name, const DataTypes & argument_types, const Array & parameters) { AggregateFunctionPtr res; DataTypePtr data_type = argument_types[0]; if (isDecimal(data_type)) res.reset(createWithDecimalType<Function>(*data_type, *data_type, argument_types)); else res.reset(createWithNumericType<Function>(*data_type, argument_types)); return res;
这里的Function
模板就是上面的AggregateFunctionSumSimple
, 而它又是下面的模板类型:ide
template <typename T> using AggregateFunctionSumSimple = typename SumSimple<T>::Function; template <typename T> struct SumSimple { /// @note It uses slow Decimal128 (cause we need such a variant). sumWithOverflow is faster for Decimal32/64 using ResultType = std::conditional_t<IsDecimalNumber<T>, Decimal128, NearestFieldType<T>>; using AggregateDataType = AggregateFunctionSumData<ResultType>; using Function = AggregateFunctionSum<T, ResultType, AggregateDataType>; };
不知道读者被绕晕了没,最终绕回来仍是new出来这个AggregateFunctionSum<T, ResultType, AggregateDataType>
也就是完成了这个求和算子的注册,后续咱们get出来就能够愉快的调用啦。(这里这部分的模板变化比较复杂,若是看不明白能够回到源码梳理一下~~~)函数
好了,关于聚合函数的基础信息,和它是如何实现而且经过工厂方法注册获取的流程算是搞明白了。
关于其余的聚合算子,也是大同小异的方式。笔者就再也不赘述了,感兴趣的能够回到源码之中继续一探究竟。讲完了聚合函数的实现,下一篇笔者就要继续给探究聚合函数究竟在ClickHouse之中是如何和列存结合使用,并实现向量化的~~。
笔者是一个ClickHouse的初学者,对ClickHouse有兴趣的同窗,也欢迎和笔者多多指教,交流。