新闻信息是经过爬虫获取,使用scrapy框架进行爬虫任务;使用airflow工做流监控平台对爬虫任务进行管理、监控(可以使用CeleryExecutor分布式,也可以使用LocalExecutor多进程进行数据采集)。如下主要是对airflow的安装和配置。php
目前使用的系统环境为Centos Linux release 7.4.1708 (core)
,linux
版本的内核Linux version 3.10.0-693.2.2e17.x86_64
.html
ip 地址:python
47.104.191.52
172.31.178.92
下载地址1(官方网站)mysql
下载地址2(清华开源镜像)linux
下载对应版本安装文件web
将下载的文件上传到Linux系统中 /opt
redis
一、执行命令安装sql
cd /opt
数据库
sh Anaconda3-5.2.0-Linux-x86_64.sh
(按回车键,直到出现>>> 输入yes)apache
/opt/anaconda3
(安装目录)
二、配置环境变量
echo "export PATH=/opt/anaconda3/bin:$PATH" >> /etc/profile
source /etc/profile
mysql做为airflow数据库,主要是记录airflow信息;
redis做为celery的broker和backend(也能够用RabbitMQ),若是不使用CeleryExecutor则不须要redis配置。
经过anaconda
安装虚拟环境news_push
/opt/anaconda3/bin/conda create -y --name news_push python=3.6.5
airflow安装、配置
激活虚拟环境news_push
source activate news_push
经过pip安装airflow
pip install apache-airflow
配置airflow目录(先建立/opt/NewsPush项目目录)
echo "export AIRFLOW_HOME=/opt/NewsPush/airflow >> /etc/profile"
source /etc/profile
初始化数据库
airflow initdb
启动airflow
airflow webserver -p 5556
可到浏览器查看http://ip:5556/admin/
配置airflow
-更改数据库为mysql
修改mysql配置文件参数(/etc/my.cnf),并重启mysql
explicit_defaults_for_timestamp=true
登陆mysql
mysql -uroot -p
回车后输入密码
新建用户airflow
create user 'airflow'@'localhost' identified by 'airflow';
建立数据库airflow
create database airflow;
赋予权限
grant all privileges on airflow.* to 'airflow'@'%' identified by 'airflow';
flush privileges;
修改airflow配置文件
vim /opt/NewsPush/airflow/airflow.cfg
修改内容为:
executor = CeleryExecutor
sql_alchemy_conn=mysql://ariflow:airflow@localhost:3306/ariflow
load_examples = False
endpoint_url = http://localhost:5556
base_url = http://localhost:5556
web_server_port = 5556
broker_url = redis://172.31.178.92:6379/3
celery_result_backend = redis://172.31.178.92:6379/4
flower_port = 5557
复制代码
安装celery支持及celeryde redis组件
pip install airflow[celery]
pip install celery[redis]
安装MySQL-python
yum install MySQL-python
pip install PyMySQL==0.7.1
若是PyMySQL版本为0.8.0或以上则会有警告:
/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 chara result = self._query(query) 复制代码
再次初始化
airflow initdb
错误解决
错误信息
Traceback (most recent call last):
File "/opt/anaconda3/envs/news_push/bin/airflow", line 17, in <module>
from airflow import configuration
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/__init__.py", line 30, in <module>
from airflow import settings
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/settings.py", line 159, in <module>
configure_orm()
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/airflow/settings.py", line 147, in configure_orm
engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/engine/__init__.py", line 424, in create_engine
return strategy.create(*args, **kwargs)
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 81, in create
dbapi = dialect_cls.dbapi(**dbapi_args)
File "/opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 102, in dbapi
return __import__('MySQLdb')
ModuleNotFoundError: No module named 'MySQLdb'
复制代码
解决(MySQLdb对python3.*支持)
vim /opt/anaconda3/envs/news_push/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py
(最后一行错误信息.py文件路径)
在代码开头增长
import pymysql
pymysql.install_as_MySQLdb()
复制代码
再次初始化
airflow initdb
airflow启动及测试
建立一个dag(/opt/NewsPush/airflow/dags/hello_world.py)
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'start_date': days_ago(1) #必须设置,尽可能用固定时间,若是使用动态的当前时间会有意想不到的问题。任务会先执行一次,再根据起始时间和schedule_interval设置开始执行
}
dag = DAG(
'example_hello_world_dag',
default_args=default_args,
description='my first DAG',
# schedule_interval=timedelta(days=1)
schedule_interval='0 */1 * * *' #每一个小时执行一次
)
def print_hello():
return 'Hello World!'
hello_operator = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
dag=dag
)
复制代码
airflow启动
如下命令都是单独开启一个窗口来启动,便于观察日志(也能够在后台启动)
注意:celery worker启动尽可能不要用root用户启动,若是要用root用户启动则添加环境变量。
用其余用户启动则airflow启动命令也对应用用户启动,并更改项目目录权限属于此用户,不然日志记录时没有权限会影响worker运行。
echo export C_FORCE_ROOT= true >> /etc/profile
source /etc/profile
复制代码
airflow webserver #启动airflow web页面
airflow scheduler #启动调度器,执行任务调度,不过任务默认是关闭的,须要在页面手动开启
airflow worker #启动celery workd
airflow flower #启动flower监控页面
复制代码
linux添加用户、用户组、密码
groupadd airflow #添加用户组airflow
useradd -g airflow airflow #添加用airflow到用户组airflow
passwd airflow #设置密码
复制代码
更改项目目录权限为启动用户(airflow)权限
chowm -R airflow:airflow /opt/NewsPush/
airflow 浏览器访问地址:http://47.104.191.52:5556/admin
flower 浏览器访问地址:http://47.104.191.52:5557/