120.76.116.128
Open in
urlscan Pro
120.76.116.128
Public Scan
URL:
http://120.76.116.128:8000/Desktop/dlyz_performance.py
Submission Tags: opendir censys opendir_recursion Search All
Submission: On January 07 via api from FI — Scanned from FI
Submission Tags: opendir censys opendir_recursion Search All
Submission: On January 07 via api from FI — Scanned from FI
Form analysis
0 forms found in the DOMText Content
from flask import * from flask_cors import CORS, cross_origin import os,sqlite3,pymongo,logging,json,xlrd import datetime,schedule import xlrd ''' /DATABASE/create read /DATABASE/TABLE/read_by ''' logging.basicConfig(level=logging.INFO,format="[%(asctime)s] %(name)s:%(levelname)s: %(message)s\n") app=Flask(__name__) CORS(app, supports_credentials=True) def execute_sql(sql,user="admin"): connection = sqlite3.connect('/home/tong/Desktop/dlyz.db') cursor = connection.cursor() # try: # ### writing log ### # _sql=sql.replace('"','\'') # transparent transfer # sql_log=f'INSERT INTO "log" (`name`,`time`,`user`) VALUES ("{_sql}","{str(datetime.datetime.now())}","{user}")' # # logging.debug(sql_log) # _ = cursor.execute(sql_log).fetchall() # connection.commit() # except: # print('Log ERROR: '+sql_log) result = cursor.execute(sql).fetchall() connection.commit() # logging.info(result) cursor.close() connection.close() return result class DatabaseManager: __all__=('sql_*', 'matrix2dict', 'initiate_database') def sql_drop(name): return f'DROP TABLE IF EXISTS "{name}";' def sql_create(name,*column): return f'CREATE TABLE "{name}" (id INTEGER PRIMARY KEY AUTOINCREMENT, '+' TEXT, '.join(column)+' TEXT);' def sql_insert(name,column:list,value:list): return f'INSERT INTO "{name}" (`'+'`,`'.join(column)+'`) VALUES ("'+'","'.join(value)+'");' def sql_read(name,*column,**kwargs): sql = 'SELECT '+','.join(column)+f' FROM {name} WHERE 1=1' for key in kwargs: sql = sql + ' AND '+str(key)+'="'+str(kwargs[key]+'"') sql=sql+';' return sql def sql_read_by_id(name,id,*column): return f'SELECT '+','.join(column)+f' FROM {name} WHERE id={str(id)};' def sql_read_one_by_id(name,id,column): return f'SELECT {column} FROM {name} WHERE id={str(id)};' def sql_delete_by_id(name,id): return f'DELETE FROM "{name}" WHERE "id" = {str(id)};' # not tested def sql_update(name,key,value,**kwargs): sql=f'UPDATE {name} SET {key} = "{str(value)}" WHERE 1=1' for key in kwargs: sql = sql + ' AND '+str(key)+'="'+str(kwargs[key]+'"') sql=sql+';' return sql def sql_update_by_id(name,id,key,value): return f'UPDATE {name} SET {key} = "{str(value)}" WHERE id={id}' def matrix2dict(matrix,*keys): result=[] for l in matrix: d = dict() for index,key in enumerate(keys): d[key]=l[index] result.append(d) return result @app.route('/database/create',methods=['post']) def initiate_database(): connection = sqlite3.connect('app/dlyz.db') cursor = connection.cursor() st = xlrd.open_workbook('database.xls').sheet_by_index(0) for row in range(1,st.nrows): _ = st.row_values(row) update = _[0] row_values = [i for i in _[1:] if i] if update=='t': # print(row_values) table_name = row_values[0] cursor.execute(DatabaseManager.sql_drop(table_name)) cursor.execute(DatabaseManager.sql_create(*row_values)) connection.commit() cursor.close() connection.close() return 'Database init' # @app.route('/database/add_bulk',methods=['post']) # def add_bulk(): # f=request.files['file'] # path = os.path.join('app/dlyz_performance','account.xls') # f.save(path) # wb = xlrd.open_workbook_xls(path) # st = wb.sheet_by_index(0) # for index,row in enumerate(range(1,st.nrows)): # r=st.row_values(row) # execute_sql(DatabaseManager.sql_insert('account', ['name','account','password','group_'], r)) # return '0' @app.route('/database/read',methods=['post']) def read_database(): data=json.loads(request.data) tablename=data['tablename'] column=data['column'].split(',') try: kwargs=data['kwargs'] except: kwargs=dict() # print(DatabaseManager.sql_read(tablename,*column,**kwargs)) result=execute_sql(DatabaseManager.sql_read(tablename,*column,**kwargs)) result=DatabaseManager.matrix2dict(result,*column) # print('db_read:',result) return jsonify(result) @app.route('/database/mongo_read',methods=['post']) def mongo_read_database(): mongodb_client=pymongo.MongoClient('mongodb://localhost:27017/') mongodb_database=mongodb_client['dlyz_performance'] mongodb_collection=mongodb_database['test'] result=list(mongodb_collection.find()) for i in result: del i['_id'] print(list(result)) return json.dumps(result) class FlaskManager: __all__=['test', 'url_map'] @app.route('/',methods=['get','post']) @app.route('/test',methods=['get','post']) def test(): print(request.headers) print(request.headers['User-Agent']) print('IP: ',request.remote_user,request.environ['REMOTE_ADDR'],request.environ.get('HTTP_X_FORWARDED_FOR', request.remote_addr)) print(request.args) print(request.data.decode()) print(request.form) print(request.files) return 'test' @app.route('/flask/url_map',methods=['post']) def url_map(): return jsonify(list(map(lambda i:str(i),app.url_map.iter_rules()))) class StaffManager: @app.route('/database/staff/read_multi_group_',methods=['post']) def staff_read_multi_group_(): groups=sorted(set([i[0] for i in execute_sql(DatabaseManager.sql_read('account','group_'))]),reverse=True) groups.remove('行政') groups.append('行政') groups.reverse() d={'group_':groups} for i in groups: result=execute_sql(DatabaseManager.sql_read('account','value',group_=i)) result=DatabaseManager.matrix2dict(result,'value') d[i]=result # print(d) return jsonify(d) class CourseManager: @app.route('/database/cource/init',methods=['post']) def cource_init(): class_list=[] class_=[22,22,18] subject='è¯ æ•° 英 é“ ç‰© 化 历 地 生 体 音 美 电 心 综 劳'.split(' ') grade = ['一','二','三'] week = '一 二 三 å›› 五'.split(' ') time_ = '1 2 3 4 5 6 7 8 9'.split(' ') # for index,i in enumerate(class_): # for j in range(i): # k=str(grade[index])+str(j+1) # k is class. # class_list.append(k) # for l in subject: # execute_sql(DatabaseManager.sql_insert('course2teacher', ['class','subject'], [k,l])) # for l in week: # for m in time_: # execute_sql(DatabaseManager.sql_insert('course2class', ['class','week','time'], [k,l,m])) wb = xlrd.open_workbook('åˆä¸€è¯¾ç¨‹è¡¨(22-23上汇总).xls') st = wb.sheet_by_index(0) new_matrix=[] for row in range(st.nrows): last='' new_row=[] for cell in st.row_values(row): if cell: last=cell new_row.append(cell) else: new_row.append(last) new_matrix.append(new_row) row = 2 column=1 for i,row_value in enumerate(new_matrix): for j,value in enumerate(row_value): if i>=row and j>=1: week_=new_matrix[0][j].replace(' ', '') class_=new_matrix[1][j] time_=new_matrix[i][0] try: time_=str(int(time_)) except: time_='' value=value.rstrip() if len(str(value))==2: value=value[0] elif 'å•' in value: value=value.replace('/', '').replace('å•', '').replace('åŒ', '/') elif 'ç¬¬äºŒè¯¾å ‚' in value: value=value.replace('ç¬¬äºŒè¯¾å ‚', '') elif 'æ ¡æœ¬' in value: value=value[2] elif 'æ•™ å·¥' in value: value=value.replace('æ•™ å·¥ å¦ ä¹ ï¼ˆå¦ç”Ÿè‡ªä¸»ä½œä¸šï¼‰', '') else: pass execute_sql(DatabaseManager.sql_insert('course2class', ['subject','week','class_','time','type'], [value,week_,class_,time_,'20232'])) return 'cource/init' @app.route('/database/cource/read_by_class',methods=['post']) def cource_read_by_class(): class_=request.args['class_'] result=[] for i in range(1,9): i=str(i) result2=execute_sql(DatabaseManager.sql_read('course2class','subject',class_=class_,time=i)) # [('æ•°',), ('体',), ('英',), ('è¯',), ('体',)] result.append({ 'id':i, '星期一':result2[0][0], '星期二':result2[1][0], '星期三':result2[2][0], '星期四':result2[3][0], '星期五':result2[4][0], }) print(result) return jsonify(result) @app.route('/test/wechat',methods=['get','post']) def GetWechat(): with open('wechatmp2.txt') as f: j=[] for i in f.read().split('\n'): if i: i=i.split('--*--') j.append({'title':i[0],'state':'未åŒæ¥','date':i[1],'link':i[2]}) return j @app.route('/test2',methods=['post']) def XlsToSqlite(): xls = xlrd.open_workbook('app/generator/dlyz_performance/account.xls') sheet = xls.sheet_by_index(0) for col in range(sheet.ncols): print(sheet.cell_value(0,col)) return '' @app.route('/database/readxls',methods=['post']) def readxls(): data=json.loads(request.data) @app.route('/login',methods=['post']) def login(): data = request.data data=json.loads(data) if data['username'] =='admin' and data['password']=='passwd': return 'True' if execute_sql(DatabaseManager.sql_read('account','password',account=data['username']))[0][0] == data['password']: return 'True' return 'False' return '' app.run(host='0.0.0.0',port='8890',debug=True,threaded=True)