量化投资入门-比特币走势监控

总体思路

通过api接口在指定的时间间隔内查询当前比特币的美元市场价(数据来源于coindesk),以最近的两小时作为观测窗口,一旦发现当前价格波动大于**5%**则会发出邮件提醒。

依赖库

import smtplib
from email.mime.text import MIMEText
import numpy as np
from datetime import datetime, timedelta
import time

import requests

读取数据

def get_bitcoin():
    url = "https://api.coindesk.com/v1/bpi/currentprice.json"

    headers = {
        'User-Agent': "PostmanRuntime/7.13.0",
        'Accept': "*/*",
        'Cache-Control': "no-cache",
        'Postman-Token': "8d7657ec-1f45-41ab-a214-39429044ba46,c24ace01-c2b5-4dc4-80b4-b6109b8f8e15",
        'Host': "api.coindesk.com",
        'accept-encoding': "gzip, deflate",
        'Connection': "keep-alive",
        'cache-control': "no-cache"
    }

    response = requests.request("GET", url, headers=headers)
    bitcoin_usd = round(response.json()['bpi']['USD']['rate_float'], 3)

    return bitcoin_usd

通过coindesk.com提供的数据api进行实时访问。返回的数据是json格式,这里我们需要根据json结构提取关键信息。

{"time":{"updated":"May 21, 2019 13:02:00 UTC","updatedISO":"2019-05-21T13:02:00+00:00","updateduk":"May 21, 2019 at 14:02 BST"},"disclaimer":"This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org","chartName":"Bitcoin","bpi":{"USD":{"code":"USD","symbol":"$","rate":"7,912.8333","description":"United States Dollar","rate_float":7912.8333},"GBP":{"code":"GBP","symbol":"£","rate":"6,229.2040","description":"British Pound Sterling","rate_float":6229.204},"EUR":{"code":"EUR","symbol":"€","rate":"7,098.4129","description":"Euro","rate_float":7098.4129}}}

我们需要的其实是['bpi']['USD']['rate_float']下面的float数值

分析数据

def monitoring(bitcoin_usd, freq):
    history_list.append(bitcoin_usd)
    if len(history_list) > freq:
        history_list.pop(0)

    arr_mean = round(float(np.mean(history_list)), 3)
    arr_var = round(float(np.var(history_list)), 3)
    arr_std = round(float(np.std(history_list)), 3)

    min_value = min(history_list)
    max_value = max(history_list)

    change_rate = round(100 * (max_value - min_value) / arr_mean, 2)

    return arr_mean, arr_var, arr_std, change_rate

下面这一段其实是一个堆栈,freq是在这个case里会等于120,因为我们假设一分钟取一次数据,2小时就是120次。这段代码就能实现我们永远聚焦在最近的2小时内。

history_list.append(bitcoin_usd)
    if len(history_list) > freq:
        history_list.pop(0)

变化率这里其实我用的方法很暴力,不是很严谨。我选用的是两小时内最高和最低的波动值相对两小时的变化率,这里其实还有优化空间。之后可能会用股票里的beta值作为判断基础。

arr_mean = round(float(np.mean(history_list)), 3)
    arr_var = round(float(np.var(history_list)), 3)
    arr_std = round(float(np.std(history_list)), 3)

    min_value = min(history_list)
    max_value = max(history_list)

    change_rate = round(100 * (max_value - min_value) / arr_mean, 2)

告警:邮件发送

def send_mail(receiver_addresses, content, change_rate,  arr_mean, arr_var, arr_std):

    # Please check the following website to set your own email config
    # https://www.runoob.com/python/python-email.html

    host = 'smtp.163.com'  # Your own email host
    port = 25
    sender = '[email protected]'  # Your own email
    pwd = '*********'  # Your own password
    body = '<h2>温馨提醒:</h2>' \
           '<h3> 当前美元价格: {}</h3>' \
           '<h3> 折合人民币价格(汇率=7): {} </h3>' \
           '<p> 2小时内变化率: {} </p>' \
           '<p> 2小时内平均值: {} </p>' \
           '<p> 2小时内反差: {} </p>' \
           '<p> 2小时内标准差: {} </p>'.format(content, content * 7, change_rate,  arr_mean, arr_var, arr_std)

    msg = MIMEText(body, 'html', "utf-8")
    msg['Subject'] = '比特币波动提醒!'
    msg['From'] = sender

    # Sometimes STMP server might block your request for no reason.
    # The solution is to include the sender's email address to the receivers' list.

    msg['To'] = '{}, [email protected]'.format(receiver_addresses)   # Add sender's email
    s = smtplib.SMTP(host, port)
    s.login(sender, pwd)
    s.sendmail(msg["From"], msg["To"].split(","), msg.as_string())

这里我们主要用stmp的依赖包实现邮件的发送,详细的介绍可以在runnoob.com上看到,非常详细。有两点要注意:

  1. host, port, pwd, sender都要根据自己的邮箱情况更改,具体方法百度一下或者上面的runnoob都有写,主要就是开通第三方使用权限。
  2. msg['To'] = '{}, [email protected]'.format(receiver_addresses),这里之所以要把自己的地址加进去,主要是避免一个防垃圾邮件的报错(如果标题里有test字样也容易报错),不加的话就会莫名报错。别纠结那么多,加上就行了。。。

main函数

def main():

    today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    # receivers = '[email protected]'
    receivers = '[email protected]'
    frequency = 120

    bitcoin_usd = get_bitcoin()

    arr_mean, arr_var, arr_std, change_rate = monitoring(bitcoin_usd, frequency)
    print('BitCoin: \t {} \t Current Price: ${} \t Change Rate: {}% \t 2HR Average: ${}'.format(today, bitcoin_usd,
                                                                                                change_rate, arr_mean))
    if change_rate > 5:
        send_mail(receivers, bitcoin_usd, arr_mean, arr_var, arr_std, change_rate)

frequency = 120,为的是只看最近2小时
代码会每分钟更新一次数据,这个也可以后期更改。

注意⚠️已知存在的小问题

  • change rate很随意,不一定合理,有改善空间
  • 发邮件的频率现在没有做限制,之后有空会更新一下,改为一小时最多发送一次,不然邮件也太多了。。。

结果展示

  • 运行界面

在这里插入图片描述

  • 邮件界面
    在这里插入图片描述

完整代码

import smtplib
from email.mime.text import MIMEText
import numpy as np
from datetime import datetime, timedelta
import time

import requests


def send_mail(receiver_addresses, content, change_rate,  arr_mean, arr_var, arr_std):

    # Please check the following website to set your own email config
    # https://www.runoob.com/python/python-email.html

    host = 'smtp.163.com'  # Your own email host
    port = 25
    sender = '[email protected]'  # Your own email
    pwd = '*********'  # Your own password
    body = '<h2>温馨提醒:</h2>' \
           '<h3> 当前美元价格: {}</h3>' \
           '<h3> 折合人民币价格(汇率=7): {} </h3>' \
           '<p> 2小时内变化率: {} </p>' \
           '<p> 2小时内平均值: {} </p>' \
           '<p> 2小时内反差: {} </p>' \
           '<p> 2小时内标准差: {} </p>'.format(content, content * 7, change_rate,  arr_mean, arr_var, arr_std)

    msg = MIMEText(body, 'html', "utf-8")
    msg['Subject'] = '比特币波动提醒!'
    msg['From'] = sender

    # Sometimes STMP server might block your request for no reason.
    # The solution is to include the sender's email address to the receivers' list.

    msg['To'] = '{}, [email protected]'.format(receiver_addresses)   # Add sender's email
    s = smtplib.SMTP(host, port)
    s.login(sender, pwd)
    s.sendmail(msg["From"], msg["To"].split(","), msg.as_string())


def get_bitcoin():
    url = "https://api.coindesk.com/v1/bpi/currentprice.json"

    headers = {
        'User-Agent': "PostmanRuntime/7.13.0",
        'Accept': "*/*",
        'Cache-Control': "no-cache",
        'Postman-Token': "8d7657ec-1f45-41ab-a214-39429044ba46,c24ace01-c2b5-4dc4-80b4-b6109b8f8e15",
        'Host': "api.coindesk.com",
        'accept-encoding': "gzip, deflate",
        'Connection': "keep-alive",
        'cache-control': "no-cache"
    }

    response = requests.request("GET", url, headers=headers)
    bitcoin_usd = round(response.json()['bpi']['USD']['rate_float'], 3)

    return bitcoin_usd


def monitoring(bitcoin_usd, freq):
    history_list.append(bitcoin_usd)
    if len(history_list) > freq:
        history_list.pop(0)

    arr_mean = round(float(np.mean(history_list)), 3)
    arr_var = round(float(np.var(history_list)), 3)
    arr_std = round(float(np.std(history_list)), 3)

    min_value = min(history_list)
    max_value = max(history_list)

    change_rate = round(100 * (max_value - min_value) / arr_mean, 2)

    return arr_mean, arr_var, arr_std, change_rate


def main():

    today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    # receivers = '[email protected]'
    receivers = '[email protected]'
    frequency = 120

    bitcoin_usd = get_bitcoin()

    arr_mean, arr_var, arr_std, change_rate = monitoring(bitcoin_usd, frequency)
    print('BitCoin: \t {} \t Current Price: ${} \t Change Rate: {}% \t 2HR Average: ${}'.format(today, bitcoin_usd,
                                                                                                change_rate, arr_mean))
    if change_rate > 5:
        send_mail(receivers, bitcoin_usd, arr_mean, arr_var, arr_std, change_rate)


if __name__ == '__main__':
    history_list = list()
    while True:

        dt = datetime.now() + timedelta(minutes=1)
        dt = dt.replace(second=0)

        try:
            main()
        except Exception as e:
            print(e)

        while datetime.now() < dt:
            time.sleep(1)