| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 | 
							- from flask import Flask, request
 
- from mvp import Mvp
 
- import json
 
- from test_info import TestInfo
 
- from tongce import TongCe
 
- from apscheduler_elab import Config
 
- from flask_apscheduler import APScheduler
 
- import decimal
 
- from report_push import ReportPush
 
- app = Flask(__name__)
 
- # app.config.from_object(Config())
 
- @app.route('/score', methods=['GET', 'POST'])
 
- def score():
 
-     """
 
-         父选项对应的标准化值
 
-     :return:
 
-     """
 
-     city = request.args.get('city', default=None, type=str)
 
-     age = request.args.get('age', default=None, type=str)
 
-     crowd = request.args.get('crowd', default=None, type=str)
 
-     print(city, age, crowd)
 
-     mvp = Mvp()
 
-     scores = mvp.query_behavioral_info(city, age, crowd)
 
-     mvp.close()
 
-     return json.dumps(scores, ensure_ascii=False)
 
- @app.route('/scores', methods=['GET', 'POST'])
 
- def scores():
 
-     mvp = Mvp()
 
-     data = mvp.scores()
 
-     return json.dumps(data, ensure_ascii=False)
 
- @app.route('/infos', methods=["GET", 'POST'])
 
- def get_city_age_crowd():
 
-     """
 
-         测试数据中城市 年龄 人群分类信息
 
-     :return:
 
-     """
 
-     mvp = Mvp()
 
-     infos = {'城市': mvp.citys, '年龄段': mvp.age, '人群分类': mvp.crowd}
 
-     mvp.close()
 
-     return json.dumps(infos, ensure_ascii=False)
 
- @app.route('/crowd_people', methods=['GET', 'POST'])
 
- def crowd_people():
 
-     """
 
-         人群分类人数统计
 
-     :return:
 
-     """
 
-     mvp = Mvp()
 
-     people_count = mvp.get_crowd_people()
 
-     mvp.close()
 
-     return json.dumps(people_count, ensure_ascii=False)
 
- @app.route('/set_behavior_tag', methods=['GET', 'POST'])
 
- def set_behavior_tag():
 
-     """
 
-         模块标准化值
 
-     :return:
 
-     """
 
-     mvp = Mvp()
 
-     mvp.close()
 
-     return json.dumps(mvp.module_scores, ensure_ascii=False)
 
- @app.route('/insert_into', methods=['GET', 'POST'])
 
- def insert_info():
 
-     mvp = Mvp()
 
-     mvp.insert()
 
-     query_data = mvp.query_data()
 
-     mvp.close()
 
-     return json.dumps(query_data, ensure_ascii=False)
 
- @app.route('/shanghai_85', methods=['GET', 'POST'])
 
- def shanghai_85():
 
-     mvp = Mvp()
 
-     data = mvp.shanghai_85_module_score_insert()
 
-     mvp.close()
 
-     return json.dumps(data, ensure_ascii=False)
 
- @app.route('/tag_tree', methods=['GET', 'POST'])
 
- def tag_tree():
 
-     mvp = Mvp()
 
-     tags = mvp.tag_data
 
-     mvp.close()
 
-     return json.dumps(tags, ensure_ascii=False)
 
- @app.route('/update_data', methods=['GET', 'POST'])
 
- def update_data():
 
-     message = None
 
-     global mvp
 
-     try:
 
-         mvp = Mvp()
 
-         message = mvp.update_data()
 
-     except Exception as e:
 
-         message['error'] = str(e)
 
-         return json.dumps(message, ensure_ascii=False)
 
-     finally:
 
-         mvp.close()
 
-     return json.dumps(message, ensure_ascii=False)
 
- @app.route('/people', methods=['GET', 'POST'])
 
- def people():
 
-     mvp = Mvp()
 
-     mvp.close()
 
-     return json.dumps(mvp.people_data(), ensure_ascii=False)
 
- @app.route('/update_gender')
 
- def update_gender_rate():
 
-     mvp = Mvp()
 
-     try:
 
-         mvp.update_gender_rate(ids=1)
 
-         mvp.close()
 
-     except Exception as e:
 
-         mvp.close()
 
-         return str(e)
 
-     return '人群性别比列更新完成...'
 
- @app.route('/test_api', methods=['GET', 'POST'])
 
- def test_api():
 
-     return '成功'
 
- @app.route('/testcase_info', methods=['GET', 'POST'])
 
- def testcase_info():
 
-     testcase_id = request.args.get('id', default=0, type=int)
 
-     ti = TestInfo()
 
-     result = ti.test_detail_info(testcase_id)
 
-     return json.dumps(result, ensure_ascii=False)
 
- @app.route('/get_uuids', methods=['GET', 'POST'])
 
- def get_uuids():
 
-     city = request.args.get('city', default=None, type=str)
 
-     age = request.args.get('age', default=None, type=str)
 
-     crowd = request.args.get('crowd', default=None, type=str)
 
-     mvp = Mvp()
 
-     uuids = mvp.people_filter(city, age, crowd)
 
-     mvp.close()
 
-     return json.dumps(uuids, ensure_ascii=False)
 
- @app.route('/tongce', methods=['GET', 'POST'])
 
- def tongce():
 
-     response = {}
 
-     try:
 
-         tongce = TongCe()
 
-         result = tongce.tongce()
 
-         response['code'] = 0
 
-         response['message'] = '成功'
 
-         response['data'] = result
 
-     except Exception as e:
 
-         response['code'] = 1
 
-         response['message'] = '失败:' + str(e)
 
-         return json.dumps(response, ensure_ascii=False)
 
-     return json.dumps(response, ensure_ascii=False)
 
- @app.route('/tongce_data', methods=['GET', 'POST'])
 
- def tongce_data():
 
-     response = {}
 
-     tongce = TongCe()
 
-     try:
 
-         result = tongce.lingdi_data_scores()
 
-         response['code'] = 0
 
-         response['message'] = '成功'
 
-         response['data'] = result
 
-     except Exception as e:
 
-         response['code'] = 1
 
-         response['message'] = '失败:' + str(e)
 
-         return json.dumps(response, ensure_ascii=False)
 
-     finally:
 
-         tongce.close()
 
-     return json.dumps(response, ensure_ascii=False)
 
- @app.route('/update_rule', methods=['GET', 'POST'])
 
- def tongce_update_rule():
 
-     try:
 
-         tongce = TongCe()
 
-         tongce.table_type_insert()
 
-     except Exception as e:
 
-         return str(e)
 
-     return '更新成功!! 1'
 
- @app.route('/update_other_city', methods=['GET', 'POST'])
 
- def update_other_city():
 
-     response = {}
 
-     try:
 
-         tongce = TongCe()
 
-         result = tongce.other_city_clean()
 
-         response['code'] = 0
 
-         response['message'] = '成功'
 
-         response['data'] = result
 
-     except Exception as e:
 
-         response['code'] = 1
 
-         response['message'] = '失败:' + str(e)
 
-         return json.dumps(response, ensure_ascii=False)
 
-     return json.dumps(response, ensure_ascii=False)
 
- @app.route('/report_test', methods=['GET', 'POST'])
 
- def report_test():
 
-     global result
 
-     try:
 
-         task_id = request.args.get('id', default=0, type=int)
 
-         report_push = ReportPush('bi_report')
 
-         result = report_push.report_push(task_id)
 
-     except Exception as e:
 
-         print(str(e))
 
-         result['error'] = str(e)
 
-     finally:
 
-         return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
 
- class DecimalEncoder(json.JSONEncoder):
 
-     def default(self, o):
 
-         if isinstance(o, decimal.Decimal):
 
-             return float(o)
 
-         super(DecimalEncoder, self).default(o)
 
- if __name__ == '__main__':
 
-     # scheduler = APScheduler()
 
-     # scheduler.init_app(app)
 
-     # scheduler.start()
 
-     app.run(
 
-         host='0.0.0.0',
 
-         port=5001
 
-     )
 
 
  |