批量清理mongodb历史数据
清理程序的原来
- 目前项目组上很多平台上线历史数据积压,导致入库查询数据缓慢,历史数据有些已经归档,进行历史数据清理删除。
- 之前临时写shell脚本,太简陋,重新使用Python进行改造,新增备份功能,和配置文件删除指定字段和时间范围内数据。
代码篇
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 | #!/usr/local/python3/bin/python3 import configparser,logging.config,sys,os,subprocess import pymongo,ast # from pymongo import MongoClient from datetime import datetime,timedelta from urllib import parse def init_mongodb(MongoDBAuth): if mongodb_auth: username = parse.quote_plus(mongodb_user) password = parse.quote_plus(mongodb_passwd) ConnPasswd = "mongodb://" + username + ":" + password + "@" + mongodb_ip + ":" + mongodb_port + "/" try : clients = pymongo.MongoClient(ConnPasswd) logger.info( "init mongodb conn: " + ConnPasswd) return clients except Exception as e: logger.info( "use mongodb user pass conn err: " + str(e)) return False else : try : clients = pymongo.MongoClient(mongodb_ip, int (mongodb_port)) logger.info( "init mongodb conn: " + mongodb_ip + ":" +mongodb_port) return clients except Exception as e: logger.info( "use mongodb user pass conn err: " + str(e)) return False #查看出全部db def get_mongodb_dbname(): db_names_list = [] db_names = mongo_client.list_database_names() for db_name in db_names: db_names_list.append(db_name) for filter_dbname in need_filter_dbname_list: if filter_dbname in db_names_list: db_names_list.remove(filter_dbname) logger.info( "delete need filter dbname: " + filter_dbname) # logger.info( "get all db_name: " +str(db_names_list)) return db_names_list #查询出db中全部表 def get_mongodb_tables(entid): db_collections_list = [] db=mongo_client[entid] collections = db.list_collection_names() for collection in collections: db_collections_list.append(collection) logger.debug( "get " + entid + " all collections: " +str(db_collections_list)) return db_collections_list #查询集合中索引索引和是否分片 def get_index_key_tables(entid,collection_name): index_list = [] formatted_results = [] db=mongo_client[entid] collection=db[collection_name] indexes = collection.list_indexes() ns_name = entid + "." + collection_name for result in indexes: formatted_result = {k.upper(): v for k, v in result.items()} each_key = formatted_result.get( "KEY" ) ns_name = formatted_result.get( "NS" ) ok_index = {key: value for key, value in each_key.items()} index_list.append(ok_index) index_list = result = [d for d in index_list if not (isinstance(d, dict) and '_id' in d and d[ '_id' ] == 1 )] collection_stats = db.command( "collstats" , collection_name) collection_sharded = collection_stats.get( "sharded" , False) if len(index_list) != 0 : logger.debug( "get collection " + ns_name + " index: " +str(index_list)) #logger.info( "get now In the collection " + ns_name + " sharded status: " +str(collection_sharded)) return index_list,collection_sharded #创建集合索引 def craete_index(entid,collection_name,index): db=mongo_client[entid] collection=db[collection_name] logger.info( "need craete index: " + entid + "." +collection_name + " : " + str(index)) # index = (list(index.keys())[ 0 ], list(index.values())[ 0 ]) index = [(k, v) for k, v in index.items()] result = collection.create_index(index) logger.info( "mongodb " +entid + "." +collection_name + " create index return msg: " + str(result) ) #查看对应dbname是否已经是shards,弃用 def is_database_sharded(database_name): db = mongo_client[ "admin" ] sharded_databases = db.command( "listshards" )[ "shards" ] for shard in sharded_databases: if database_name in db.command( "listdatabases" )[ "databases" ]: return True return False #创建分片索引片键 def create_sharded_func(entid, collection_name, shard_key): db = mongo_client[ "admin" ] collection_path = '{}.{}' .format(entid, collection_name) logger.info( "need craete sharded key : " + collection_path + " : " + str(shard_key)) sharding_colunm,sharding_type = "" , "" for key, value in shard_key.items(): sharding_colunm= key sharding_type = value try : db.command( 'enableSharding' , entid) except Exception as e: logger.error( "create dbname sharded key error: return: " + str(e)) try : result = db.command( 'shardCollection' , collection_path,key = {sharding_colunm:sharding_type}) logger.info(entid + "." + collection_path + " create sharded key return: " + str(result)) except Exception as e: logger.error( "create sharded key error: return: " + str(e)) #读取文件获取对应索引和片键key信息 def read_file_index(index_file): index_list = [] Shard_list = [] with open(index_file, 'r' ) as f: for line in f.readlines(): line = line.replace( " " , "" ) #通过mongodbShard: 来区分那个片键的可以,写 # print(line) if "mongodbShard:" not in line: table, key_str = line.strip().split( "=" ) key = ast.literal_eval(key_str) index_list.append({table: key}) else : Shard_key_str = line.strip().split( "mongodbShard:" )[ 1 ] Shard_key_str = ast.literal_eval(Shard_key_str) Shard_list.append(Shard_key_str) return index_list,Shard_list #获取多少天前的时间戳 def get_timestamp_days_ago(get_days): # 获取当前日期和时间 now = datetime.now() # 减去 30 天 date_30_days_ago = now - timedelta(days= int (get_days)) # 将结果转换为当天的整点 00 : 00 : 00 date_start_of_day = date_30_days_ago.replace(hour= 0 , minute= 0 , second= 0 , microsecond= 0 ) # 将结果转换为时间戳 timestamp = int (date_start_of_day .timestamp()) return timestamp #判断字符串类型和长度对应返回需要删除的时间字段值 def if_string_type(data_stamp): del_timestamp = "" get_need_del_timestamp = get_timestamp_days_ago( int (Del_day)) if isinstance(data_stamp, str) and len(data_stamp) == 10 : del_timestamp = str(get_need_del_timestamp) if isinstance(data_stamp, str) and len(data_stamp) == 13 : del_timestamp = str(get_need_del_timestamp) + "000" if isinstance(data_stamp, int ) and len(str(data_stamp)) == 10 : del_timestamp = get_need_del_timestamp if isinstance(data_stamp, int ) and len(str(data_stamp)) == 13 : del_timestamp = int (get_need_del_timestamp) * 1000 return del_timestamp #获取该集合中一条数据 def get_one_data(entid,collection_name): db=mongo_client[entid] collection=db[collection_name] Filter_conditions_key = str(need_del_table_field) result = collection.find_one({}, {**{Filter_conditions_key: 1 }, '_id' : 0 }) if result and Filter_conditions_key in result: start_time_value = result.get(Filter_conditions_key) logger.debug( "get " + entid + "." + collection_name + " Corresponding " +Filter_conditions_key + " field value: " + str(start_time_value) ) return start_time_value else : # logger.info( "No " +Filter_conditions_key + " field found in the document. return: " + str(result) ) return False # 按照日期删除该集合中历史数据 def del_data(entid,collection_name,get_del_timestamp): db=mongo_client[entid] collection=db[collection_name] Filter_conditions_key = str(need_del_table_field) Filter_conditions_value = get_del_timestamp try : result = collection.delete_many({Filter_conditions_key: { "$lt" : Filter_conditions_value}}) logger.info(entid + " run sql: db" + "." +collection_name+ ".remove({" +Filter_conditions_key+ ":" + "{$lt:" +str(Filter_conditions_value) + "})" ) if result.deleted_count > 0 : logger.info( "By date delete " + str(entid) + "." + collection_name + " less than " + str(get_del_timestamp) + " del document count: " + str(result.deleted_count)) except Exception as e: logger.error( "Error occurred while deleting documents: " + str(e)) # 删除该集合中全部历史数据 def del_all_data(entid,collection_name): db=mongo_client[entid] collection=db[collection_name] try : result = collection.delete_many({}) if result.deleted_count > 0 : logger.info(entid + " run sql: db" + "." +collection_name+ ".remove({})" ) logger.info(entid + "." + collection_name + " del all document count: " + str(result.deleted_count)) except Exception as e: logger.info(entid + "." + collection_name + " del all document error: " + str(result) ) # 备份数据 def dump_mongodb_data(dbname,table,not_quiet_dump,del_time): status_info = [ "1" ] if is_del_bakcup_data: if os.path.exists(mongodump_command_path): run_status = " && echo $?" run_commnd = "" if not_quiet_dump: if mongodb_auth: #run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationDatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd + " -d " + dbname + " -c " + table + " -q '{" + need_del_table_field + ": {" + + "}}'" + " -o " + bakcup_dir_path run_command = f "{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationDatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table} -q '{{" {need_del_table_field} ": {{" $lt ": " {del_time} "}}}}' -o {bakcup_dir_path}" else : # run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path run_commnd = f "{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table} -q '{{" {need_del_table_field} ": {{" $lt ": " {del_time} "}}}}' -o {bakcup_dir_path}" else : if mongodb_auth: # run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " --authenticationDatabase=" +mongodb_auth_db + " -u " + mongodb_user + " -p " + mongodb_passwd + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path run_command = f "{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} --authenticationDatabase={mongodb_auth_db} -u {mongodb_user} -p {mongodb_passwd} -d {dbname} -c {table} -o {bakcup_dir_path}" else : # run_commnd = mongodump_command_path + " -h " + mongodb_ip + ":" + str(mongodb_port) + " -d " + dbname + " -c " + table + " -o " + bakcup_dir_path run_commnd = f "{mongodump_command_path} -h {mongodb_ip}:{mongodb_port} -d {dbname} -c {table} -o {bakcup_dir_path}" logger.info( "run command: " + run_commnd) try : msg = os.popen(run_commnd + run_status) status_info = [line.strip() for line in msg.readlines()] logger.info( "mongodump command result: " + str(status_info)) except Exception as e: logger.error( "mongodump command error: " + str(e)) else : logger.info( "mongodump command file not exists ," + mongodump_command_path) else : logger.debug( "config file not set is_del_bakcup_data = True, not dump data" ) return status_info if __name__== "__main__" : cfgpath = "./cfg/config.ini" conf = configparser.ConfigParser() conf.read(cfgpath) mongodb_ip = conf.get( "main" , "mongodb_ip" ) mongodb_port = conf.get( "main" , "mongodb_port" ) mongodb_auth = conf.getboolean( "main" , "mongodb_auth" ) mongodb_user = conf.get( "main" , "mongodb_user" ) mongodb_passwd = conf.get( "main" , "mongodb_passwd" ) mongodb_auth_db = conf.get( "main" , "mongodb_auth_db" ) need_filter_dbname = conf.get( "main" , "need_filter_dbname" ) is_del_bakcup_data = conf.getboolean( "main" , "is_del_bakcup_data" ) bakcup_dir_path = conf.get( "main" , "bakcup_dir_path" ) mongodump_command_path = conf.get( "main" , "mongodump_command_path" ) Del_day = conf.get( "main" , "Del_day" ) need_del_table_field = conf.get( "main" , "need_del_table_field" ) need_del_table_list = conf.get( "main" , "need_del_table_list" ) need_del_table_list = [item for item in need_del_table_list.split( "," ) if item != '' ] need_del_null_table_list = conf.get( "main" , "need_del_null_table_list" ) need_del_null_table_list = [item for item in need_del_null_table_list.split( "," ) if item != '' ] auth_get_entid = conf.getboolean( "main" , "auth_get_entid" ) need_filter_dbname_list = [item for item in need_filter_dbname.split( "," ) if item != '' ] #获取配置项 all_ent_id = conf.get( "main" , "ent_id" ) get_dbname_list = all_ent_id.split( "," ) logging.config.fileConfig( "./cfg/logger.conf" ) logger = logging.getLogger( "rotatfile" ) # 初始化 MongoDB mongo_client = init_mongodb(mongodb_auth) if mongo_client: logger.info( "MongoDB init successfully" ) else : logger.error( "Failed to initialize MongoDB" ) sys.exit( 10 ) if auth_get_entid: get_dbname_list = get_mongodb_dbname() logger.info( "get all dbname list: " + str(get_dbname_list)) else : logger.info( "file get dbname list: " + str(get_dbname_list)) for dbname in get_dbname_list: get_end_all_table = get_mongodb_tables(dbname) for table in need_del_table_list: get_one_data_mes = get_one_data(dbname,table) if table in get_end_all_table: get_index_key_tables(dbname,table) else : logger.error(dbname + " not have table: " + table) continue # break #删除按照日期数据 if get_one_data_mes: get_del_timestmap = if_string_type(get_one_data_mes) if dump_mongodb_data(dbname,table,True,get_del_timestmap)[ 0 ] == '0' or is_del_bakcup_data == False: if get_del_timestmap: del_data(dbname,table,get_del_timestmap) else : logger.error( "get del timestmap fail" ) else : if is_del_bakcup_data == False: logger.error( "is_del_bakcup_data seting False, dump mongodb data fail" ) else : logger.error( "dump mongodb data fail, but is del backup data" ) for null_table in need_del_null_table_list: if dump_mongodb_data(dbname,null_table,False, "1" )[ 0 ] == '0' or is_del_bakcup_data == False: if null_table in get_end_all_table: #删除全部历史数据 del_all_data(dbname,null_table) else : logger.error( dbname + " not have table: " + null_table) else : if is_del_bakcup_data == False: logger.error( "is_del_bakcup_data seting False, dump mongodb data fail" ) else : logger.error( "dump mongodb data fail, but is del backup data" ) mongo_client.close() logger.info( "MongoDB closed" ) |
配置文件篇
- 该配置项大概使用说明
- 支持删除指定时间前,进行数据备份在删除,根据不同配置项进行配置;
- 同理可支持不进行备份,也可以清理删除,根据不同配置项进行配置;
- 根据字段来查询过滤。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | [ DEFAULT ] mongodb_ip = 10.130.47.197 mongodb_port = 40000 mongodb_auth = False mongodb_user = admin mongodb_passwd = test@123 mongodb_auth_db = admin #从全部dbname中进行过滤不需要处理的dbname,使用逗号分割 need_filter_dbname = local , config , admin #指定需要按照日期删除的集合,使用逗号分割 need_del_table_list = new_r_ags_e_back , call_detail_back #指定需要按照日期删除的集合字段过滤 need_del_table_field = start_time #指定清空删除的集合,使用逗号分割 need_del_null_table_list = call_duration_cache , duration_cache [ main ] #是否自动获取对应mongodb中全部dbname auth_get_entid = False #从配置文件中获取dbname ent_id = 20241205 , 20250107 #需要删除多少天以前的数据 Del_day = 97 #是否需要备份数据 is_del_bakcup_data = False #备份目录 bakcup_dir_path = ./data #备份命令路径 mongodump_command_path = /home/devops/Python/Mongodb_del_history/mongodump |
脚本运行
- 脚本运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | [devops@db1 Mongodb_del_history]$ tar xf Mongodb_del_history. tar .gz [devops@db1 Mongodb_del_history]$ cd Mongodb_del_history [devops@db1 Mongodb_del_history]$ nohup . /del_history_data & 2025-01-06 14:15:01 139749303605056 del_history_data.py:24 INFO init mongodb conn: 10.130.47.197:40000 2025-01-06 14:15:01 139749303605056 del_history_data.py:303 INFO MongoDB init successfully 2025-01-06 14:15:01 139749303605056 del_history_data.py:39 INFO delete need filter dbname: local 2025-01-06 14:15:01 139749303605056 del_history_data.py:310 INFO get all dbname list: [ '0103290010' , '0103290012' , '0103290013' , '0103290015' ] 2025-01-06 14:15:01 139749303605056 del_history_data.py:321 ERROR 0103290010 not have table: jhk_task_status 2025-01-06 14:15:01 139749303605056 del_history_data.py:321 ERROR 0103290010 not have table: sd_call_detail_back 2025-01-06 14:15:01 139749303605056 del_history_data.py:229 INFO run command : /home/devops/Python/Mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c call_duration_cache -o . /data 2025-01-06 14:15:01 139749303605056 del_history_data.py:233 INFO mongodump command result: [ '0' ] 2025-01-06 14:15:01 139749303605056 del_history_data.py:229 INFO run command : /home/devops/Python/Mongodb_del_history/mongodump -h 10.130.47.197:40000 -d 0103290010 -c duration_cache -o . /data 2025-01-06 14:15:01 139749303605056 del_history_data.py:233 INFO mongodump command result: [ '0' ] 2025-01-06 14:15:01 139749303605056 del_history_data.py:347 INFO MongoDB closed |
二进制文件程序下载
- 使用链接下载
1 | wget https: //zhao138969 .com /LinuxPackage/Python/del_history_data |
到此这篇关于批量清理mongodb历史数据的方法详解的文章就介绍到这了,更多相关清理mongodb历史数据内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!