通过api接口在指定的时间间隔内查询当前比特币的美元市场价(数据来源于coindesk),以最近的两小时作为观测窗口,一旦发现当前价格波动大于**5%**则会发出邮件提醒。
import smtplib from email.mime.text import MIMEText import numpy as np from datetime import datetime, timedelta import time import requests
def get_bitcoin(): url = "https://api.coindesk.com/v1/bpi/currentprice.json" headers = { 'User-Agent': "PostmanRuntime/7.13.0", 'Accept': "*/*", 'Cache-Control': "no-cache", 'Postman-Token': "8d7657ec-1f45-41ab-a214-39429044ba46,c24ace01-c2b5-4dc4-80b4-b6109b8f8e15", 'Host': "api.coindesk.com", 'accept-encoding': "gzip, deflate", 'Connection': "keep-alive", 'cache-control': "no-cache" } response = requests.request("GET", url, headers=headers) bitcoin_usd = round(response.json()['bpi']['USD']['rate_float'], 3) return bitcoin_usd
通过coindesk.com提供的数据api进行实时访问。返回的数据是json格式,这里我们需要根据json结构提取关键信息。
{"time":{"updated":"May 21, 2019 13:02:00 UTC","updatedISO":"2019-05-21T13:02:00+00:00","updateduk":"May 21, 2019 at 14:02 BST"},"disclaimer":"This data was produced from the CoinDesk Bitcoin Price Index (USD). Non-USD currency data converted using hourly conversion rate from openexchangerates.org","chartName":"Bitcoin","bpi":{"USD":{"code":"USD","symbol":"$","rate":"7,912.8333","description":"United States Dollar","rate_float":7912.8333},"GBP":{"code":"GBP","symbol":"£","rate":"6,229.2040","description":"British Pound Sterling","rate_float":6229.204},"EUR":{"code":"EUR","symbol":"€","rate":"7,098.4129","description":"Euro","rate_float":7098.4129}}}
我们需要的其实是['bpi']['USD']['rate_float']
下面的float数值
def monitoring(bitcoin_usd, freq): history_list.append(bitcoin_usd) if len(history_list) > freq: history_list.pop(0) arr_mean = round(float(np.mean(history_list)), 3) arr_var = round(float(np.var(history_list)), 3) arr_std = round(float(np.std(history_list)), 3) min_value = min(history_list) max_value = max(history_list) change_rate = round(100 * (max_value - min_value) / arr_mean, 2) return arr_mean, arr_var, arr_std, change_rate
下面这一段其实是一个堆栈,freq是在这个case里会等于120,因为我们假设一分钟取一次数据,2小时就是120次。这段代码就能实现我们永远聚焦在最近的2小时内。
history_list.append(bitcoin_usd) if len(history_list) > freq: history_list.pop(0)
变化率这里其实我用的方法很暴力,不是很严谨。我选用的是两小时内最高和最低的波动值相对两小时的变化率,这里其实还有优化空间。之后可能会用股票里的beta值作为判断基础。
arr_mean = round(float(np.mean(history_list)), 3) arr_var = round(float(np.var(history_list)), 3) arr_std = round(float(np.std(history_list)), 3) min_value = min(history_list) max_value = max(history_list) change_rate = round(100 * (max_value - min_value) / arr_mean, 2)
def send_mail(receiver_addresses, content, change_rate, arr_mean, arr_var, arr_std): # Please check the following website to set your own email config # https://www.runoob.com/python/python-email.html host = 'smtp.163.com' # Your own email host port = 25 sender = '[email protected]' # Your own email pwd = '*********' # Your own password body = '<h2>温馨提醒:</h2>' \ '<h3> 当前美元价格: {}</h3>' \ '<h3> 折合人民币价格(汇率=7): {} </h3>' \ '<p> 2小时内变化率: {} </p>' \ '<p> 2小时内平均值: {} </p>' \ '<p> 2小时内反差: {} </p>' \ '<p> 2小时内标准差: {} </p>'.format(content, content * 7, change_rate, arr_mean, arr_var, arr_std) msg = MIMEText(body, 'html', "utf-8") msg['Subject'] = '比特币波动提醒!' msg['From'] = sender # Sometimes STMP server might block your request for no reason. # The solution is to include the sender's email address to the receivers' list. msg['To'] = '{}, [email protected]'.format(receiver_addresses) # Add sender's email s = smtplib.SMTP(host, port) s.login(sender, pwd) s.sendmail(msg["From"], msg["To"].split(","), msg.as_string())
这里我们主要用stmp的依赖包实现邮件的发送,详细的介绍可以在runnoob.com上看到,非常详细。有两点要注意:
msg['To'] = '{}, [email protected]'.format(receiver_addresses)
,这里之所以要把自己的地址加进去,主要是避免一个防垃圾邮件的报错(如果标题里有test字样也容易报错),不加的话就会莫名报错。别纠结那么多,加上就行了。。。def main(): today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # receivers = '[email protected]' receivers = '[email protected]' frequency = 120 bitcoin_usd = get_bitcoin() arr_mean, arr_var, arr_std, change_rate = monitoring(bitcoin_usd, frequency) print('BitCoin: \t {} \t Current Price: ${} \t Change Rate: {}% \t 2HR Average: ${}'.format(today, bitcoin_usd, change_rate, arr_mean)) if change_rate > 5: send_mail(receivers, bitcoin_usd, arr_mean, arr_var, arr_std, change_rate)
frequency = 120,为的是只看最近2小时
代码会每分钟更新一次数据,这个也可以后期更改。
注意⚠️已知存在的小问题
import smtplib from email.mime.text import MIMEText import numpy as np from datetime import datetime, timedelta import time import requests def send_mail(receiver_addresses, content, change_rate, arr_mean, arr_var, arr_std): # Please check the following website to set your own email config # https://www.runoob.com/python/python-email.html host = 'smtp.163.com' # Your own email host port = 25 sender = '[email protected]' # Your own email pwd = '*********' # Your own password body = '<h2>温馨提醒:</h2>' \ '<h3> 当前美元价格: {}</h3>' \ '<h3> 折合人民币价格(汇率=7): {} </h3>' \ '<p> 2小时内变化率: {} </p>' \ '<p> 2小时内平均值: {} </p>' \ '<p> 2小时内反差: {} </p>' \ '<p> 2小时内标准差: {} </p>'.format(content, content * 7, change_rate, arr_mean, arr_var, arr_std) msg = MIMEText(body, 'html', "utf-8") msg['Subject'] = '比特币波动提醒!' msg['From'] = sender # Sometimes STMP server might block your request for no reason. # The solution is to include the sender's email address to the receivers' list. msg['To'] = '{}, [email protected]'.format(receiver_addresses) # Add sender's email s = smtplib.SMTP(host, port) s.login(sender, pwd) s.sendmail(msg["From"], msg["To"].split(","), msg.as_string()) def get_bitcoin(): url = "https://api.coindesk.com/v1/bpi/currentprice.json" headers = { 'User-Agent': "PostmanRuntime/7.13.0", 'Accept': "*/*", 'Cache-Control': "no-cache", 'Postman-Token': "8d7657ec-1f45-41ab-a214-39429044ba46,c24ace01-c2b5-4dc4-80b4-b6109b8f8e15", 'Host': "api.coindesk.com", 'accept-encoding': "gzip, deflate", 'Connection': "keep-alive", 'cache-control': "no-cache" } response = requests.request("GET", url, headers=headers) bitcoin_usd = round(response.json()['bpi']['USD']['rate_float'], 3) return bitcoin_usd def monitoring(bitcoin_usd, freq): history_list.append(bitcoin_usd) if len(history_list) > freq: history_list.pop(0) arr_mean = round(float(np.mean(history_list)), 3) arr_var = round(float(np.var(history_list)), 3) arr_std = round(float(np.std(history_list)), 3) min_value = min(history_list) max_value = max(history_list) change_rate = round(100 * (max_value - min_value) / arr_mean, 2) return arr_mean, arr_var, arr_std, change_rate def main(): today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # receivers = '[email protected]' receivers = '[email protected]' frequency = 120 bitcoin_usd = get_bitcoin() arr_mean, arr_var, arr_std, change_rate = monitoring(bitcoin_usd, frequency) print('BitCoin: \t {} \t Current Price: ${} \t Change Rate: {}% \t 2HR Average: ${}'.format(today, bitcoin_usd, change_rate, arr_mean)) if change_rate > 5: send_mail(receivers, bitcoin_usd, arr_mean, arr_var, arr_std, change_rate) if __name__ == '__main__': history_list = list() while True: dt = datetime.now() + timedelta(minutes=1) dt = dt.replace(second=0) try: main() except Exception as e: print(e) while datetime.now() < dt: time.sleep(1)