| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 | from flask import Flask, requestfrom mvp import Mvpimport jsonfrom test_info import TestInfofrom tongce import TongCefrom apscheduler_elab import Configfrom flask_apscheduler import APSchedulerfrom email_util import EmailUtilimport decimalfrom report_push import ReportPushapp = Flask(__name__)# app.config.from_object(Config())@app.route('/score', methods=['GET', 'POST'])def score():    """        父选项对应的标准化值    :return:    """    city = request.args.get('city', default=None, type=str)    age = request.args.get('age', default=None, type=str)    crowd = request.args.get('crowd', default=None, type=str)    print(city, age, crowd)    mvp = Mvp()    scores = mvp.query_behavioral_info(city, age, crowd)    mvp.close()    return json.dumps(scores, ensure_ascii=False)@app.route('/scores', methods=['GET', 'POST'])def scores():    mvp = Mvp()    data = mvp.scores()    return json.dumps(data, ensure_ascii=False)@app.route('/infos', methods=["GET", 'POST'])def get_city_age_crowd():    """        测试数据中城市 年龄 人群分类信息    :return:    """    mvp = Mvp()    infos = {'城市': mvp.citys, '年龄段': mvp.age, '人群分类': mvp.crowd}    mvp.close()    return json.dumps(infos, ensure_ascii=False)@app.route('/crowd_people', methods=['GET', 'POST'])def crowd_people():    """        人群分类人数统计    :return:    """    mvp = Mvp()    people_count = mvp.get_crowd_people()    mvp.close()    return json.dumps(people_count, ensure_ascii=False)@app.route('/set_behavior_tag', methods=['GET', 'POST'])def set_behavior_tag():    """        模块标准化值    :return:    """    mvp = Mvp()    mvp.close()    return json.dumps(mvp.module_scores, ensure_ascii=False)@app.route('/insert_into', methods=['GET', 'POST'])def insert_info():    mvp = Mvp()    mvp.insert()    query_data = mvp.query_data()    mvp.close()    return json.dumps(query_data, ensure_ascii=False)@app.route('/shanghai_85', methods=['GET', 'POST'])def shanghai_85():    mvp = Mvp()    data = mvp.shanghai_85_module_score_insert()    mvp.close()    return json.dumps(data, ensure_ascii=False)@app.route('/tag_tree', methods=['GET', 'POST'])def tag_tree():    mvp = Mvp()    tags = mvp.tag_data    mvp.close()    return json.dumps(tags, ensure_ascii=False)@app.route('/update_data', methods=['GET', 'POST'])def update_data():    message = None    global mvp    try:        mvp = Mvp()        message = mvp.update_data()    except Exception as e:        message['error'] = str(e)        return json.dumps(message, ensure_ascii=False)    finally:        mvp.close()    return json.dumps(message, ensure_ascii=False)@app.route('/people', methods=['GET', 'POST'])def people():    mvp = Mvp()    mvp.close()    return json.dumps(mvp.people_data(), ensure_ascii=False)@app.route('/update_gender')def update_gender_rate():    mvp = Mvp()    try:        mvp.update_gender_rate(ids=1)        mvp.close()    except Exception as e:        mvp.close()        return str(e)    return '人群性别比列更新完成...'@app.route('/test_api', methods=['GET', 'POST'])def test_api():    return '成功'@app.route('/testcase_info', methods=['GET', 'POST'])def testcase_info():    testcase_id = request.args.get('id', default=0, type=int)    ti = TestInfo()    result = ti.test_detail_info(testcase_id)    return json.dumps(result, ensure_ascii=False)@app.route('/get_uuids', methods=['GET', 'POST'])def get_uuids():    city = request.args.get('city', default=None, type=str)    age = request.args.get('age', default=None, type=str)    crowd = request.args.get('crowd', default=None, type=str)    mvp = Mvp()    uuids = mvp.people_filter(city, age, crowd)    mvp.close()    return json.dumps(uuids, ensure_ascii=False)@app.route('/tongce', methods=['GET', 'POST'])def tongce():    response = {}    try:        tongce = TongCe()        result = tongce.tongce()        response['code'] = 0        response['message'] = '成功'        response['data'] = result    except Exception as e:        response['code'] = 1        response['message'] = '失败:' + str(e)        return json.dumps(response, ensure_ascii=False)    return json.dumps(response, ensure_ascii=False)@app.route('/tongce_data', methods=['GET', 'POST'])def tongce_data():    response = {}    tongce = TongCe()    try:        result = tongce.lingdi_data_scores()        response['code'] = 0        response['message'] = '成功'        response['data'] = result    except Exception as e:        response['code'] = 1        response['message'] = '失败:' + str(e)        return json.dumps(response, ensure_ascii=False)    finally:        tongce.close()    return json.dumps(response, ensure_ascii=False)@app.route('/update_rule', methods=['GET', 'POST'])def tongce_update_rule():    try:        tongce = TongCe()        tongce.table_type_insert()    except Exception as e:        return str(e)    return '更新成功!! 1'@app.route('/send_mail', methods=['GET', 'POST'])def send_mail():    mail = EmailUtil()    mail.send_test()    return '<h1>邮件发送成功</h1>'@app.route('/update_other_city', methods=['GET', 'POST'])def update_other_city():    response = {}    try:        tongce = TongCe()        result = tongce.other_city_clean()        response['code'] = 0        response['message'] = '成功'        response['data'] = result    except Exception as e:        response['code'] = 1        response['message'] = '失败:' + str(e)        return json.dumps(response, ensure_ascii=False)    return json.dumps(response, ensure_ascii=False)@app.route('/report_test', methods=['GET', 'POST'])def report_test():    global result    try:        task_id = request.args.get('id', default=0, type=int)        report_push = ReportPush('bi_report')        result = report_push.report_push(task_id)    except Exception as e:        print(str(e))        result['error'] = str(e)    finally:        return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)class DecimalEncoder(json.JSONEncoder):    def default(self, o):        if isinstance(o, decimal.Decimal):            return float(o)        super(DecimalEncoder, self).default(o)if __name__ == '__main__':    # scheduler = APScheduler()    # scheduler.init_app(app)    # scheduler.start()    app.run(        host='0.0.0.0',        port=5001    )
 |