| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 | from flask import Flask, requestimport jsonfrom apscheduler_elab import Configfrom flask_apscheduler import APSchedulerimport decimalfrom report_push import ReportPushfrom jianye_report import JianYeReportapp = Flask(__name__)app.config.from_object(Config())@app.route('/report_test', methods=['GET', 'POST'])def report_test():    global result    try:        task_id = request.args.get('id', default=0, type=int)        report_push = ReportPush('bi_report')        result = report_push.report_push(task_id)    except Exception as e:        print(str(e))        result['error'] = str(e)    finally:        return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)@app.route('/report_jianye', methods=['GET', 'POST'])def report_jianye():    report_jianye = JianYeReport()    task_id = request.args.get('id', default=0, type=int)    result = report_jianye.send_mail_to_customer(task_id)    report_jianye.db.close()    return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)@app.route('/debug_test', methods=['GET', 'POST'])def debug_func():    result = {}    rj = JianYeReport()    try:        # data = rj.brand_data()        house_ids = rj.get_house_id_by_brand_id('13')        # result['houseids'] = house_ids        # result['data1'] = data        # content = rj.get_brand_content(MailContentText.text_1, data)        # result['content'] = 'success'        table_2 = rj.house_data(house_ids)        result['data2'] = table_2    except Exception as e:        result['error'] = str(e)    finally:        return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)@app.route('/send_mail_to_customer', methods=['GET', 'POST'])def send_mail_to_customer():    result = {}    rj = JianYeReport()    try:        customer_id = request.args.get('id', default=0, type=int)        mail = request.args.get('mail', default=None, type=str)        data = rj.send_mail_for_customer_id(customer_id, mail)        result['data'] = data        pass    except Exception as e:        result['error'] = str(e)    return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)class DecimalEncoder(json.JSONEncoder):    def default(self, o):        if isinstance(o, decimal.Decimal):            return float(o)        super(DecimalEncoder, self).default(o)if __name__ == '__main__':    scheduler = APScheduler()    scheduler.init_app(app)    scheduler.start()    app.run(        host='0.0.0.0',        port=5001    )
 |