__pycache__/__init__.cpython-39.opt-1.pyc000064400000070762151116317160014121 0ustar00a akhv@s ddlZddlZddlZddlZddlZddlZddlmZddlmZddl m Z ddl Z ddl m Z ddlmZddlmZddlmZdd lmZdd lmZdd lmZdd lmZdd lmZmZmZm Z ddl!m"Z"m#Z#ddl$m%Z%ddl&m'Z'm(Z(m)Z)ddZ*GdddeZ+dS)N)ProcessPoolExecutor)datetime)getpwuid) __version__) SoSComponent) SoSIPParser) SoSMacParser)SoSHostnameParser)SoSKeywordParser)SoSUsernameParser) SoSIPv6Parser)SoSReportArchiveSoSReportDirectorySoSCollectorArchiveSoSCollectorDirectory)DataDirArchiveTarballArchive)InsightsArchive)get_human_readable import_moduleImporterHelpercCs ||SN)obfuscate_arc_files)arcflistr8/usr/lib/python3.9/site-packages/sos/cleaner/__init__.pyr)src sNeZdZdZdZdgggdgdddddgd ZdJfd d ZdKd d ZdLddZdMddZ dNddZ e ddZ ddZ ddZe ddZddZddZd d!Zd"d#Zd$d%Zd&d'Zd(d)Zd*d+Zd,d-ZdOd.d/Zd0d1Zd2d3Zd4d5Zd6d7Zd8d9Zd:d;Zdd?Z!d@dAZ"dBdCZ#dDdEZ$dFdGZ%dHdIZ&Z'S)P SoSCleanera- This function is designed to obfuscate potentially sensitive information from an sos report archive in a consistent and reproducible manner. It may either be invoked during the creation of a report by using the --clean option in the report command, or may be used on an already existing archive by way of 'sos clean'. The target of obfuscation are items such as IP addresses, MAC addresses, hostnames, usernames, and also keywords provided by users via the --keywords and/or --keyword-file options. For every collection made in a report the collection is parsed for such items, and when items are found SoS will generate an obfuscated replacement for it, and in all places that item is found replace the text with the obfuscated replacement mapped to it. These mappings are saved locally so that future iterations will maintain the same consistent obfuscation pairing. In the case of IP addresses, support is for IPv4 and IPv6 - effort is made to keep network topology intact so that later analysis is as accurate and easily understandable as possible. If an IP address is encountered that we cannot determine the netmask for, a private IP address from 172.17.0.0/22 range is used instead. For IPv6, note that IPv4-mapped addresses, e.g. ::ffff:10.11.12.13, are NOT supported currently, and will remain unobfuscated. For hostnames, domains are obfuscated as whole units, leaving the TLD in place. For instance, 'example.com' may be obfuscated to 'obfuscateddomain0.com' and 'foo.example.com' may end up being 'obfuscateddomain1.com'. Users will be notified of a 'mapping' file that records all items and the obfuscated counterpart mapped to them for ease of reference later on. This file should be kept private. z6Obfuscate sensitive networking information in a reportautoN /etc/sos/cleaner/default_mappingF) archive_typedomainsdisable_parsersskip_cleaning_filesjobskeywords keyword_filemap_file no_updatekeep_binary_filestarget usernamesc s|st|||d|_n|d|_|d|_|d|_|d|_|d|_d|_t|jdsl|jj |j_ d |j_ t d |_t d |_tjtj|jd dd |||_td||_|j|_|jjd |_|jjrtj|jjnd}|j||jj g}t!|t"|t#|t$|t%|t&|g|_'|jj(D]v}|j'D]h} | j)*j+dddd} | ,} |*,| krL|-d| |j.d|d|j'/| qLqBt0t1t2t3t4t5t6g|_7d|_8|-d|jdS)NToptionstmpdirsys_tmppolicymanifestFr&rsosZsos_uicleanerexist_ok?z/etc/sos/cleanerparser)maxsplitrzDisabling parser: zDisabling the 'zP' parser. Be aware that this may leave sensitive plain-text data in the archive.z#Cleaner initialized. From cmdline: )9super__init__ from_cmdlineoptsr/r0r1r2hasattrthreadsr&r"loggingZ getLoggersoslogui_logosmakedirspathjoinreview_parser_values load_map_fileZcleaner_mappingumaskin_placeZget_preferred_hash_name hash_name components add_section cleaner_mdr)dirnamer%r rr rr r parsersr$namelowersplitstriplog_infowarningremoverr rrrrr archive_typesnested_archive) selfr8argsZcmdlinerKZ hook_commons cleaner_dirZ parser_args_parserZ_loadedZ_tempZ _loaded_name __class__rrr<fst                 zSoSCleaner.__init__cCsd|rd|ndd|S)Nz[cleaner:r!z] rr[msgcallerrrr _fmt_log_msgszSoSCleaner._fmt_log_msgcCs|j|||dSr)rBdebugrerbrrr log_debugszSoSCleaner.log_debugcCs|j|||dSr)rBinforerbrrrrVszSoSCleaner.log_infocCs|j|||dSr)rBerrorrerbrrr log_errorszSoSCleaner.log_errorcCs|d||jdS)NzSoS Cleaner Detailed Help)Z set_titleadd_text__doc__)clssectionrrr display_helps zSoSCleaner.display_helpc Csi}d}tj|jjr,td|jjdtj|jjs`|jj|kr|d|jjdnt|jjdddz}zt |}WnZt j y|d Yn>ty}z&|d |jjd |WYd }~n d }~00Wd n1s0Y|S) zVerifies that the map file exists and has usable content. If the provided map file does not exist, or it is empty, we will print a warning and continue on with cleaning building a fresh map r zRequested map file z is a directoryzERROR: map file z6 does not exist, will not load any obfuscation matchesrutf-8encodingzOERROR: Unable to parse map file, json is malformed. Will not load any mappings.zERROR: Could not load '': N) rDrFisdirr>r) ExceptionexistsrjopenjsonloadZJSONDecodeError)r[Z_confZ default_mapmferrrrrrIs* <zSoSCleaner.load_map_filec Cs|d}|jdtd|j||jjsz tdWnVtyf|jd|dYn0t y}z|d|WYd}~n d}~00dS) zWhen we are directly running `sos clean`, rather than hooking into SoSCleaner via report or collect, print a disclaimer banner aThis command will attempt to obfuscate information that is generally considered to be potentially sensitive. Such information includes IP addresses, MAC addresses, domain names, and any user-provided keywords. Note that this utility provides a best-effort approach to data obfuscation, but it does not guarantee that such obfuscation provides complete coverage of all such data in the archive, or that any obfuscation is provided to data that does not fit the description above. Users should review any resulting data and/or archives generated or processed by this utility for remaining sensitive content before being passed to a third party. z sos clean (version z) z- Press ENTER to continue, or CTRL-C to quit. z Exiting on user cancelr9N) Z_fmt_msgrCrhrr>ZbatchinputKeyboardInterrupt_exitrv)r[rcerrrprint_disclaimers     zSoSCleaner.print_disclaimercCsd|_|dd}|jdddd|jdd gd d d |jd dgdd|jddgddd|jdddgddd|jdddtdd|jddgdd d|jd!dd"d#d$|jd%d&d'd(d)|jd*d+d,d-d.d/|jd0d,d-d1d2d3|jd4d5gdd6d/dS)7Nzsos clean|mask TARGET [options]zCleaner/Masking Optionsz7These options control how data obfuscation is performedr,ZTARGETz%The directory or archive to obfuscate)metavarhelpz--archive-typer)rreportZcollectZinsightszdata-dirtarballz8Specify what kind of archive the target was generated as)defaultchoicesrz --domainsextendz!List of domain names to obfuscate)actionrrz--disable-parsersr$zCDisable specific parsers, so that those elements are not obfuscated)rrdestrz--skip-cleaning-filesz--skip-masking-filesr%zBList of files to skip/ignore during cleaning. Globs are supported.z-jz--jobsrz&Number of concurrent archives to clean)rtyperz --keywordsr'zList of keywords to obfuscatez--keyword-filer(z&Provide a file a keywords to obfuscate)rrrz --map-filer)r z;Provide a previously generated mapping file for obfuscation)rrrz --no-updater*F store_truezr,)r[rFrrrset_target_path0szSoSCleaner.set_target_pathcCsd}|jjdkrN|jjdd}|jD]$}|j|kr&||jj|j|jj}q&n4|jD],}||jjrT||jj|j|jj}qqT|sdS||_ |j ||j r|j ||j |||_|jr|jj|j_dS)zThe target path is not a directory, so inspect it for being an archive or an archive of archives. In the event the target path is not an archive, abort. Nr-_)r>r"replacerY type_namer,r/r+Z check_is_type main_archive report_pathsappend is_nestedrZget_nested_archivesrXrZ descriptionui_name)r[Z_arcZ check_typearchiverrrrinspect_target_archive7s0        z!SoSCleaner.inspect_target_archivecCsJ|jjD]&}t|ddkrtd|dqdd|jjD|j_dS)zCheck any values passed to the parsers via the commandline: - For the --domains option, ensure that they are valid for the parser in question. - Convert --skip-cleaning-files from globs to regular expressions. .zInvalid value 'z0' given: --domains values must be actual domainscSsg|]}t|qSr)fnmatch translate).0prrr cz3SoSCleaner.review_parser_values..N)r>r#lenrTrvr%)r[Z_domrrrrHWs  zSoSCleaner.review_parser_valuesc Cs|jjd|j_|jjdddd|_|jr>|g|_tj |jjst|j d|jj| d||js|j d| dg|_|jD]}|jdkr|jq||||js|jrd S|j d | d|j d t|jd |}||}||||jrZd d|jD}||fSd }t|jdkrx|}n|jd}|j}| |j}|d ur|!|ddd|j"} t#tj $|j%| ddd} | &|Wd n1s0Y|'tj $|j%|!|dd}t()||t*|} |j d||j d|d|j dt+| j,|j dt-| j.j/d|j d|0d S)a,SoSCleaner will begin by inspecting the TARGET option to determine if it is a directory, archive, or archive of archives. In the case of a directory, the default behavior will be to edit the data in place. For an archive will we unpack the archive, iterate over the contents, and then repack the archive. In the case of an archive of archives, such as one from SoSCollector, each archive will be unpacked, cleaned, and repacked and the final top-level archive will then be repacked as well. /z.tarrz*Invalid target: no such file or directory r9z'No valid archives or directories found zHostname ParserNz#No reports obfuscated, aborting... z Successfully obfuscated z report(s) cSsg|] }|jqSr)final_archive_path)rarrrrrz&SoSCleaner.execute..rwrqrrz2A mapping of obfuscated elements is available at z) The obfuscated archive is available at  z Size z Owner zcPlease send the obfuscated archive to your support representative and keep the mapping file private)1r>r,rstriprTarc_namer=rrrDrFrwrCrirrcompleted_reportsrQrRmappingZset_initial_countspreload_all_archives_into_mapsgenerate_parser_item_regexesobfuscate_report_pathsrKrhrcompile_mapping_dictwrite_map_for_archivewrite_map_for_configwrite_stats_to_manifestrebuild_nested_archiverget_new_checksumobfuscate_stringrLrxrGr0writewrite_cleaner_logshutilmovestatrst_sizerst_uidpw_nameZcleanup) r[r8_mapmap_pathZ arc_pathsZ final_pathZarc_pathrchecksumZ chksum_namecfZarcstatrrrexecutefs                *    zSoSCleaner.executec Cs|jd}|j|d|jD]J}|jdd}||j}|durd|d|j}|jj||dqt |j j D]X\}}}|D]H} t j || } | |j j d}|d}|jj| |dt | qqv|jd d |j|jjS) zHandles repacking the nested tarball, now containing only obfuscated copies of the reports, log files, manifest, etc... z -obfuscated)rRrrNz checksums/rrTr)rZ setup_archiverrrTrrLrZ add_stringrDwalkrZextracted_pathrFrGlstripadd_filerXrfinalizer>Zcompression_type) r[rrZarc_destrZdnameZdirnrfilesfilenamefnamerrrrs"      z!SoSCleaner.rebuild_nested_archivecCs2i}|jD]"}i||j<||j|q |S)aBuild a dict that contains each parser's map as a key, with the contents as that key's value. This will then be written to disk in the same directory as the obfuscated report so that sysadmins have a way to 'decode' the obfuscation locally )rQZ map_file_keyupdateZget_map_contents)r[rr8rrrrs   zSoSCleaner.compile_mapping_dictcCsFt|ddd$}|tj|ddWdn1s80Y|S)zjWrite the mapping to a file on disk that is in the same location as the final archive(s). rrqrrr)indentN)rxrrydumps)r[rrFr{rrrwrite_map_to_files2zSoSCleaner.write_map_to_filec Cshz,tj|j||jd}|||WStyb}z|d|WYd}~dSd}~00dS)Nz -private_mapz"Could not write private map file: ) rDrFrGr0rrrrvrj)r[rrr|rrrrsz SoSCleaner.write_map_for_archivec Cs|jjr|jjstj|jj}z6tj|dd|||jj|d|jjWn4t y}z| d|WYd}~n d}~00dS)z}Write the mapping to the config file so that subsequent runs are able to provide the same consistent mapping Tr5zWrote mapping to z&Could not update mapping config file: N) r>r)r*rDrFrPrErrgrvrj)r[rr]r|rrrrszSoSCleaner.write_map_for_configcCstj|j|jd}t|ddd6}|jd|jD]}| |q>Wdn1sb0Y|r| ||j j |dddS) zWhen invoked via the command line, the logging from SoSCleaner will not be added to the archive(s) it processes, so we need to write it separately to disk z-obfuscation.logrrqrrrNzsos_logs/cleaner.logr) rDrFrGr0rrxZ sos_log_fileseek readlinesrobfuscate_filerr)r[rZlog_nameZlogfilelinerrrrs * zSoSCleaner.write_cleaner_logc Cszhd}t|dF}t|j}||}|s.q:||q|dWdWS1s\0YWn4ty}z|d|WYd}~n d}~00dS)zvCalculate a new checksum for the obfuscated archive, as the previous checksum will no longer be valid irbrNz!Could not generate new checksum: ) rxhashlibnewrLreadrZ hexdigestrvrg)r[ archive_pathZ hash_sizeZ archive_fpZdigestZhashdatar|rrrrs    0&zSoSCleaner.get_new_checksumcCszdt|jd|jjd}|j||jjr>|jd|jD]"}|jd|j| |qD|j r| | |j Wn(t y|jdt dYn0dS) zPerform the obfuscation for each archive or sos directory discovered during setup. Each archive is handled in a separate thread, up to self.opts.jobs will be obfuscated concurrently. zFound z. total reports to obfuscate, processing up to z! concurrently within one archive zpWARNING: binary files that potentially contain sensitive information will NOT be removed from the final archive z Obfuscating zExiting on user cancelr}N)rrr>r&rCrhr+rWrobfuscate_reportrZ_replace_obfuscated_archivesrrDr)r[rcZ report_pathrrrr1s&     z!SoSCleaner.obfuscate_report_pathscCsV|jD]J}t|j|jj}|jdd}tj ||}t |j|||_qdS)zWhen we have a nested archive, we need to rebuild the original archive, which entails replacing the existing archives with their obfuscated counterparts rrN) rrDrXrrZrrrTrFrGrr)r[rrrZ dest_namerrrrOs  z'SoSCleaner._replace_obfuscated_archivescCs|jD] }|qdS)zFor the parsers that use prebuilt lists of items, generate those regexes now since all the parsers should be preloaded by the archive(s) as well as being handed cmdline options and mapping file configuration. N)rQZgenerate_item_regexes)r[r8rrrr\s z'SoSCleaner.generate_parser_item_regexesc Cs.|jD]}|jd}|||D]}||}|sBq.|d|d|d|j| D]T}z| |Wqht y}z(|d|d|d|WYd}~qhd}~00qhq.| ||} | r|d|d |j| D]} |j | q|j|D]} |j | qq||jdS) a* For each archive we've determined we need to operate on, pass it to each prepper so that we can extract necessary files and/or items for direct regex replacement. Preppers define these methods per parser, so it is possible that a single prepper will read the same file for different parsers/mappings. This is preferable to the alternative of building up monolithic lists of file paths, as we'd still need to manipulate these on a per-archive basis. :param archive: The archive we are currently using to prepare our mappings with :type archive: ``SoSObfuscationArchive`` subclass :param prepper: The individual prepper we're using to source items :type prepper: ``SoSPrepper`` subclass rz Prepping z parser with file z from zFailed to prep z map from : Nz mapping with items from )rQrRrSrTrUZget_parser_file_listZget_file_contentrgr splitlinesZ parse_linervZget_items_for_mapraddZ regex_itemsZadd_regex_item set_parsers) r[rprepperr^ZpnameZ_fileZcontentrr|Z map_itemsitemZritemrrr_prepare_archive_with_prepperds4     z(SoSCleaner._prepare_archive_with_prepperccsZttjj}g}|D]}|td|qt|dddD]}||jdVqBdS)a Discover all locally available preppers so that we can prepare the mappings with obfuscation matches in a controlled manner :returns: All preppers that can be leveraged locally :rtype: A generator of `SoSPrepper` items zsos.cleaner.preppers.cSs|jSr)priority)xrrrrz)SoSCleaner.get_preppers..)key)r.N) rr3r4ZpreppersZ get_modulesrrsortedr>)r[helperZprepsZ_preprrrr get_prepperss   zSoSCleaner.get_prepperscCsB|d|D]}|jD]}|||qq|j|jdS)aBefore doing the actual obfuscation, if we have multiple archives to obfuscate then we need to preload each of them into the mappings to ensure that node1 is obfuscated in node2 as well as node2 being obfuscated in node1's archive. z.Pre-loading all archives into obfuscation mapsN)rVrrrrrrQ)r[rrrrrrs    z)SoSCleaner.preload_all_archives_into_mapsc szjj}t}|d|js4dt d}}}fddt j j D}tj j jd^}|t|fddt j j D} | D]"\} } } || 7}|| 7}|| 7}qWdn1s0YzWn<tyB} z"jd | jd WYd} ~ n d} ~ 00zWn<ty} z"jd | jd WYd} ~ n d} ~ 00js0}|r$d z j|WnRty"} z8d jd| d| WYd} ~ WdSd} ~ 00jt}|d||d|||d||d|d}|rd}||}d|Wn@ty} z&jdjd| WYd} ~ n d} ~ 00dS)zIndividually handle each archive or directory we've discovered by running through each file therein. Positional arguments: :param archive str: Filepath to the directory or archive start_timezBeginning obfuscation...rcsg|]}qSrrrirrrrrz/SoSCleaner.obfuscate_report..) max_workersZ initializercsg|]}|djjqSr)r>r&r) file_listr[rrrrNz!Failed to obfuscate directories: rdzFailed to obfuscate symlinks: zRe-compressing...zArchive z failed to compress: zFailed to re-compress archive: end_timeZrun_timeZfiles_obfuscatedZtotal_substitutionsr!z! [removed %s unprocessable files]zObfuscation completedzException while processing r) rOrN archive_namerZnow add_fieldZ is_extractedextractZ report_msglistZ get_filesranger>r&rZload_parser_entriesmaprobfuscate_directory_namesrvrVobfuscate_symlinksrZget_compressionZrename_top_dirrcompressrgrrrCrh)r[rZarc_mdrZfiles_obfuscated_countZtotal_sub_countZremoved_file_countZ archive_listexecutorZfuturesZfocZtscZrfcr|methodrZrmsgr)rrr[rrs       (        zSoSCleaner.obfuscate_reportcCs|j|gdSr)rr)r[rrrrrszSoSCleaner.obfuscate_filec s|jd|jd|D]}z||jddfdd|jD}|sb|ddWq|jd |jdt |}tj |j| }| |}||ks||krt |t||Wqty}z"|d |d |WYd }~qd }~00qd S) aIterate over symlinks in the archive and obfuscate their names. The content of the link target will have already been cleaned, and this second pass over just the names of the links is to ensure we avoid a possible race condition dependent on the order in which the link or the target get obfuscated. :param archive: The archive being obfuscated :type archive: ``SoSObfuscationArchive`` zObfuscating symlink namesrr9rcs(g|] }tfdd|jDs|qS)c3s|]}|VqdSr)match)rZ_skipZ_symrr *rz;SoSCleaner.obfuscate_symlinks...)anyZ skip_patterns)rZ_prrrr(sz1SoSCleaner.obfuscate_symlinks..z Skipping obfuscation of symlink z due to skip pattern matchzObfuscating symlink zError obfuscating symlink 'rtN)rVrZ get_symlinksrTrrrQrgrDreadlinkrFrGrrXsymlinkrv)r[rrZ_parsers_targetZ _ob_sym_nameZ _ob_targetr|rrrrs2         zSoSCleaner.obfuscate_symlinkscCs|d|jt|ddD]~}t|D]n}tj||}||j d}tj |r0| |}||kr0| |}tj|j | d|}t||q0q"dS)zFor all directories that exist within the archive, obfuscate the directory name if it contains sensitive strings found during execution z'Obfuscating directory names in archive T)reverserrN)rVrrZget_directory_listrDlistdirrFrGrTrrurrrrename)r[rdirpath_nameZ_dirnameZ_arc_dirZ _ob_dirnameZ _ob_arc_dirrrrrEs"   z$SoSCleaner.obfuscate_directory_namesc CsT|jD]H}z||}WqtyL}z|d|WYd}~qd}~00q|S)NzError obfuscating string data: )rQZparse_string_for_keysrvrV)r[Z string_datar8r|rrrr\s  (zSoSCleaner.obfuscate_stringcCsL|jd}|jD]4}||jdd}|dt|jj qdS)zLWrite some cleaner-level, non-report-specific stats to the manifest rQ rentriesN) rOrNrQrRrrSrrrZdatasetkeys)r[Z parse_secr8Z_secrrrrds  z"SoSCleaner.write_stats_to_manifest)NNNFN)N)N)N)N)F)(__name__ __module__ __qualname__rlZdescZ arg_defaultsr<rergrVrj classmethodrorIrrrrrHrrrrrrrrrrrrrrrrrrrr __classcell__rrr_rr-sf'N      1 a    , i.r),rryrArDrrconcurrent.futuresrrpwdrZsos.cleaner.preppersr3rZ sos.componentrZsos.cleaner.parsers.ip_parserrZsos.cleaner.parsers.mac_parserrZ#sos.cleaner.parsers.hostname_parserr Z"sos.cleaner.parsers.keyword_parserr Z#sos.cleaner.parsers.username_parserr Zsos.cleaner.parsers.ipv6_parserr Zsos.cleaner.archives.sosr rrrZsos.cleaner.archives.genericrrZsos.cleaner.archives.insightsrZ sos.utilitiesrrrrrrrrr s.            __pycache__/__init__.cpython-39.pyc000064400000070762151116317160013162 0ustar00a akhv@s ddlZddlZddlZddlZddlZddlZddlmZddlmZddl m Z ddl Z ddl m Z ddlmZddlmZddlmZdd lmZdd lmZdd lmZdd lmZdd lmZmZmZm Z ddl!m"Z"m#Z#ddl$m%Z%ddl&m'Z'm(Z(m)Z)ddZ*GdddeZ+dS)N)ProcessPoolExecutor)datetime)getpwuid) __version__) SoSComponent) SoSIPParser) SoSMacParser)SoSHostnameParser)SoSKeywordParser)SoSUsernameParser) SoSIPv6Parser)SoSReportArchiveSoSReportDirectorySoSCollectorArchiveSoSCollectorDirectory)DataDirArchiveTarballArchive)InsightsArchive)get_human_readable import_moduleImporterHelpercCs ||SN)obfuscate_arc_files)arcflistr8/usr/lib/python3.9/site-packages/sos/cleaner/__init__.pyr)src sNeZdZdZdZdgggdgdddddgd ZdJfd d ZdKd d ZdLddZdMddZ dNddZ e ddZ ddZ ddZe ddZddZddZd d!Zd"d#Zd$d%Zd&d'Zd(d)Zd*d+Zd,d-ZdOd.d/Zd0d1Zd2d3Zd4d5Zd6d7Zd8d9Zd:d;Zdd?Z!d@dAZ"dBdCZ#dDdEZ$dFdGZ%dHdIZ&Z'S)P SoSCleanera- This function is designed to obfuscate potentially sensitive information from an sos report archive in a consistent and reproducible manner. It may either be invoked during the creation of a report by using the --clean option in the report command, or may be used on an already existing archive by way of 'sos clean'. The target of obfuscation are items such as IP addresses, MAC addresses, hostnames, usernames, and also keywords provided by users via the --keywords and/or --keyword-file options. For every collection made in a report the collection is parsed for such items, and when items are found SoS will generate an obfuscated replacement for it, and in all places that item is found replace the text with the obfuscated replacement mapped to it. These mappings are saved locally so that future iterations will maintain the same consistent obfuscation pairing. In the case of IP addresses, support is for IPv4 and IPv6 - effort is made to keep network topology intact so that later analysis is as accurate and easily understandable as possible. If an IP address is encountered that we cannot determine the netmask for, a private IP address from 172.17.0.0/22 range is used instead. For IPv6, note that IPv4-mapped addresses, e.g. ::ffff:10.11.12.13, are NOT supported currently, and will remain unobfuscated. For hostnames, domains are obfuscated as whole units, leaving the TLD in place. For instance, 'example.com' may be obfuscated to 'obfuscateddomain0.com' and 'foo.example.com' may end up being 'obfuscateddomain1.com'. Users will be notified of a 'mapping' file that records all items and the obfuscated counterpart mapped to them for ease of reference later on. This file should be kept private. z6Obfuscate sensitive networking information in a reportautoN /etc/sos/cleaner/default_mappingF) archive_typedomainsdisable_parsersskip_cleaning_filesjobskeywords keyword_filemap_file no_updatekeep_binary_filestarget usernamesc s|st|||d|_n|d|_|d|_|d|_|d|_|d|_d|_t|jdsl|jj |j_ d |j_ t d |_t d |_tjtj|jd dd |||_td||_|j|_|jjd |_|jjrtj|jjnd}|j||jj g}t!|t"|t#|t$|t%|t&|g|_'|jj(D]v}|j'D]h} | j)*j+dddd} | ,} |*,| krL|-d| |j.d|d|j'/| qLqBt0t1t2t3t4t5t6g|_7d|_8|-d|jdS)NToptionstmpdirsys_tmppolicymanifestFr&rsosZsos_uicleanerexist_ok?z/etc/sos/cleanerparser)maxsplitrzDisabling parser: zDisabling the 'zP' parser. Be aware that this may leave sensitive plain-text data in the archive.z#Cleaner initialized. From cmdline: )9super__init__ from_cmdlineoptsr/r0r1r2hasattrthreadsr&r"loggingZ getLoggersoslogui_logosmakedirspathjoinreview_parser_values load_map_fileZcleaner_mappingumaskin_placeZget_preferred_hash_name hash_name components add_section cleaner_mdr)dirnamer%r rr rr r parsersr$namelowersplitstriplog_infowarningremoverr rrrrr archive_typesnested_archive) selfr8argsZcmdlinerKZ hook_commons cleaner_dirZ parser_args_parserZ_loadedZ_tempZ _loaded_name __class__rrr<fst                 zSoSCleaner.__init__cCsd|rd|ndd|S)Nz[cleaner:r!z] rr[msgcallerrrr _fmt_log_msgszSoSCleaner._fmt_log_msgcCs|j|||dSr)rBdebugrerbrrr log_debugszSoSCleaner.log_debugcCs|j|||dSr)rBinforerbrrrrVszSoSCleaner.log_infocCs|j|||dSr)rBerrorrerbrrr log_errorszSoSCleaner.log_errorcCs|d||jdS)NzSoS Cleaner Detailed Help)Z set_titleadd_text__doc__)clssectionrrr display_helps zSoSCleaner.display_helpc Csi}d}tj|jjr,td|jjdtj|jjs`|jj|kr|d|jjdnt|jjdddz}zt |}WnZt j y|d Yn>ty}z&|d |jjd |WYd }~n d }~00Wd n1s0Y|S) zVerifies that the map file exists and has usable content. If the provided map file does not exist, or it is empty, we will print a warning and continue on with cleaning building a fresh map r zRequested map file z is a directoryzERROR: map file z6 does not exist, will not load any obfuscation matchesrutf-8encodingzOERROR: Unable to parse map file, json is malformed. Will not load any mappings.zERROR: Could not load '': N) rDrFisdirr>r) ExceptionexistsrjopenjsonloadZJSONDecodeError)r[Z_confZ default_mapmferrrrrrIs* <zSoSCleaner.load_map_filec Cs|d}|jdtd|j||jjsz tdWnVtyf|jd|dYn0t y}z|d|WYd}~n d}~00dS) zWhen we are directly running `sos clean`, rather than hooking into SoSCleaner via report or collect, print a disclaimer banner aThis command will attempt to obfuscate information that is generally considered to be potentially sensitive. Such information includes IP addresses, MAC addresses, domain names, and any user-provided keywords. Note that this utility provides a best-effort approach to data obfuscation, but it does not guarantee that such obfuscation provides complete coverage of all such data in the archive, or that any obfuscation is provided to data that does not fit the description above. Users should review any resulting data and/or archives generated or processed by this utility for remaining sensitive content before being passed to a third party. z sos clean (version z) z- Press ENTER to continue, or CTRL-C to quit. z Exiting on user cancelr9N) Z_fmt_msgrCrhrr>ZbatchinputKeyboardInterrupt_exitrv)r[rcerrrprint_disclaimers     zSoSCleaner.print_disclaimercCsd|_|dd}|jdddd|jdd gd d d |jd dgdd|jddgddd|jdddgddd|jdddtdd|jddgdd d|jd!dd"d#d$|jd%d&d'd(d)|jd*d+d,d-d.d/|jd0d,d-d1d2d3|jd4d5gdd6d/dS)7Nzsos clean|mask TARGET [options]zCleaner/Masking Optionsz7These options control how data obfuscation is performedr,ZTARGETz%The directory or archive to obfuscate)metavarhelpz--archive-typer)rreportZcollectZinsightszdata-dirtarballz8Specify what kind of archive the target was generated as)defaultchoicesrz --domainsextendz!List of domain names to obfuscate)actionrrz--disable-parsersr$zCDisable specific parsers, so that those elements are not obfuscated)rrdestrz--skip-cleaning-filesz--skip-masking-filesr%zBList of files to skip/ignore during cleaning. Globs are supported.z-jz--jobsrz&Number of concurrent archives to clean)rtyperz --keywordsr'zList of keywords to obfuscatez--keyword-filer(z&Provide a file a keywords to obfuscate)rrrz --map-filer)r z;Provide a previously generated mapping file for obfuscation)rrrz --no-updater*F store_truezr,)r[rFrrrset_target_path0szSoSCleaner.set_target_pathcCsd}|jjdkrN|jjdd}|jD]$}|j|kr&||jj|j|jj}q&n4|jD],}||jjrT||jj|j|jj}qqT|sdS||_ |j ||j r|j ||j |||_|jr|jj|j_dS)zThe target path is not a directory, so inspect it for being an archive or an archive of archives. In the event the target path is not an archive, abort. Nr-_)r>r"replacerY type_namer,r/r+Z check_is_type main_archive report_pathsappend is_nestedrZget_nested_archivesrXrZ descriptionui_name)r[Z_arcZ check_typearchiverrrrinspect_target_archive7s0        z!SoSCleaner.inspect_target_archivecCsJ|jjD]&}t|ddkrtd|dqdd|jjD|j_dS)zCheck any values passed to the parsers via the commandline: - For the --domains option, ensure that they are valid for the parser in question. - Convert --skip-cleaning-files from globs to regular expressions. .zInvalid value 'z0' given: --domains values must be actual domainscSsg|]}t|qSr)fnmatch translate).0prrr cz3SoSCleaner.review_parser_values..N)r>r#lenrTrvr%)r[Z_domrrrrHWs  zSoSCleaner.review_parser_valuesc Cs|jjd|j_|jjdddd|_|jr>|g|_tj |jjst|j d|jj| d||js|j d| dg|_|jD]}|jdkr|jq||||js|jrd S|j d | d|j d t|jd |}||}||||jrZd d|jD}||fSd }t|jdkrx|}n|jd}|j}| |j}|d ur|!|ddd|j"} t#tj $|j%| ddd} | &|Wd n1s0Y|'tj $|j%|!|dd}t()||t*|} |j d||j d|d|j dt+| j,|j dt-| j.j/d|j d|0d S)a,SoSCleaner will begin by inspecting the TARGET option to determine if it is a directory, archive, or archive of archives. In the case of a directory, the default behavior will be to edit the data in place. For an archive will we unpack the archive, iterate over the contents, and then repack the archive. In the case of an archive of archives, such as one from SoSCollector, each archive will be unpacked, cleaned, and repacked and the final top-level archive will then be repacked as well. /z.tarrz*Invalid target: no such file or directory r9z'No valid archives or directories found zHostname ParserNz#No reports obfuscated, aborting... z Successfully obfuscated z report(s) cSsg|] }|jqSr)final_archive_path)rarrrrrz&SoSCleaner.execute..rwrqrrz2A mapping of obfuscated elements is available at z) The obfuscated archive is available at  z Size z Owner zcPlease send the obfuscated archive to your support representative and keep the mapping file private)1r>r,rstriprTarc_namer=rrrDrFrwrCrirrcompleted_reportsrQrRmappingZset_initial_countspreload_all_archives_into_mapsgenerate_parser_item_regexesobfuscate_report_pathsrKrhrcompile_mapping_dictwrite_map_for_archivewrite_map_for_configwrite_stats_to_manifestrebuild_nested_archiverget_new_checksumobfuscate_stringrLrxrGr0writewrite_cleaner_logshutilmovestatrst_sizerst_uidpw_nameZcleanup) r[r8_mapmap_pathZ arc_pathsZ final_pathZarc_pathrchecksumZ chksum_namecfZarcstatrrrexecutefs                *    zSoSCleaner.executec Cs|jd}|j|d|jD]J}|jdd}||j}|durd|d|j}|jj||dqt |j j D]X\}}}|D]H} t j || } | |j j d}|d}|jj| |dt | qqv|jd d |j|jjS) zHandles repacking the nested tarball, now containing only obfuscated copies of the reports, log files, manifest, etc... z -obfuscated)rRrrNz checksums/rrTr)rZ setup_archiverrrTrrLrZ add_stringrDwalkrZextracted_pathrFrGlstripadd_filerXrfinalizer>Zcompression_type) r[rrZarc_destrZdnameZdirnrfilesfilenamefnamerrrrs"      z!SoSCleaner.rebuild_nested_archivecCs2i}|jD]"}i||j<||j|q |S)aBuild a dict that contains each parser's map as a key, with the contents as that key's value. This will then be written to disk in the same directory as the obfuscated report so that sysadmins have a way to 'decode' the obfuscation locally )rQZ map_file_keyupdateZget_map_contents)r[rr8rrrrs   zSoSCleaner.compile_mapping_dictcCsFt|ddd$}|tj|ddWdn1s80Y|S)zjWrite the mapping to a file on disk that is in the same location as the final archive(s). rrqrrr)indentN)rxrrydumps)r[rrFr{rrrwrite_map_to_files2zSoSCleaner.write_map_to_filec Cshz,tj|j||jd}|||WStyb}z|d|WYd}~dSd}~00dS)Nz -private_mapz"Could not write private map file: ) rDrFrGr0rrrrvrj)r[rrr|rrrrsz SoSCleaner.write_map_for_archivec Cs|jjr|jjstj|jj}z6tj|dd|||jj|d|jjWn4t y}z| d|WYd}~n d}~00dS)z}Write the mapping to the config file so that subsequent runs are able to provide the same consistent mapping Tr5zWrote mapping to z&Could not update mapping config file: N) r>r)r*rDrFrPrErrgrvrj)r[rr]r|rrrrszSoSCleaner.write_map_for_configcCstj|j|jd}t|ddd6}|jd|jD]}| |q>Wdn1sb0Y|r| ||j j |dddS) zWhen invoked via the command line, the logging from SoSCleaner will not be added to the archive(s) it processes, so we need to write it separately to disk z-obfuscation.logrrqrrrNzsos_logs/cleaner.logr) rDrFrGr0rrxZ sos_log_fileseek readlinesrobfuscate_filerr)r[rZlog_nameZlogfilelinerrrrs * zSoSCleaner.write_cleaner_logc Cszhd}t|dF}t|j}||}|s.q:||q|dWdWS1s\0YWn4ty}z|d|WYd}~n d}~00dS)zvCalculate a new checksum for the obfuscated archive, as the previous checksum will no longer be valid irbrNz!Could not generate new checksum: ) rxhashlibnewrLreadrZ hexdigestrvrg)r[ archive_pathZ hash_sizeZ archive_fpZdigestZhashdatar|rrrrs    0&zSoSCleaner.get_new_checksumcCszdt|jd|jjd}|j||jjr>|jd|jD]"}|jd|j| |qD|j r| | |j Wn(t y|jdt dYn0dS) zPerform the obfuscation for each archive or sos directory discovered during setup. Each archive is handled in a separate thread, up to self.opts.jobs will be obfuscated concurrently. zFound z. total reports to obfuscate, processing up to z! concurrently within one archive zpWARNING: binary files that potentially contain sensitive information will NOT be removed from the final archive z Obfuscating zExiting on user cancelr}N)rrr>r&rCrhr+rWrobfuscate_reportrZ_replace_obfuscated_archivesrrDr)r[rcZ report_pathrrrr1s&     z!SoSCleaner.obfuscate_report_pathscCsV|jD]J}t|j|jj}|jdd}tj ||}t |j|||_qdS)zWhen we have a nested archive, we need to rebuild the original archive, which entails replacing the existing archives with their obfuscated counterparts rrN) rrDrXrrZrrrTrFrGrr)r[rrrZ dest_namerrrrOs  z'SoSCleaner._replace_obfuscated_archivescCs|jD] }|qdS)zFor the parsers that use prebuilt lists of items, generate those regexes now since all the parsers should be preloaded by the archive(s) as well as being handed cmdline options and mapping file configuration. N)rQZgenerate_item_regexes)r[r8rrrr\s z'SoSCleaner.generate_parser_item_regexesc Cs.|jD]}|jd}|||D]}||}|sBq.|d|d|d|j| D]T}z| |Wqht y}z(|d|d|d|WYd}~qhd}~00qhq.| ||} | r|d|d |j| D]} |j | q|j|D]} |j | qq||jdS) a* For each archive we've determined we need to operate on, pass it to each prepper so that we can extract necessary files and/or items for direct regex replacement. Preppers define these methods per parser, so it is possible that a single prepper will read the same file for different parsers/mappings. This is preferable to the alternative of building up monolithic lists of file paths, as we'd still need to manipulate these on a per-archive basis. :param archive: The archive we are currently using to prepare our mappings with :type archive: ``SoSObfuscationArchive`` subclass :param prepper: The individual prepper we're using to source items :type prepper: ``SoSPrepper`` subclass rz Prepping z parser with file z from zFailed to prep z map from : Nz mapping with items from )rQrRrSrTrUZget_parser_file_listZget_file_contentrgr splitlinesZ parse_linervZget_items_for_mapraddZ regex_itemsZadd_regex_item set_parsers) r[rprepperr^ZpnameZ_fileZcontentrr|Z map_itemsitemZritemrrr_prepare_archive_with_prepperds4     z(SoSCleaner._prepare_archive_with_prepperccsZttjj}g}|D]}|td|qt|dddD]}||jdVqBdS)a Discover all locally available preppers so that we can prepare the mappings with obfuscation matches in a controlled manner :returns: All preppers that can be leveraged locally :rtype: A generator of `SoSPrepper` items zsos.cleaner.preppers.cSs|jSr)priority)xrrrrz)SoSCleaner.get_preppers..)key)r.N) rr3r4ZpreppersZ get_modulesrrsortedr>)r[helperZprepsZ_preprrrr get_prepperss   zSoSCleaner.get_prepperscCsB|d|D]}|jD]}|||qq|j|jdS)aBefore doing the actual obfuscation, if we have multiple archives to obfuscate then we need to preload each of them into the mappings to ensure that node1 is obfuscated in node2 as well as node2 being obfuscated in node1's archive. z.Pre-loading all archives into obfuscation mapsN)rVrrrrrrQ)r[rrrrrrs    z)SoSCleaner.preload_all_archives_into_mapsc szjj}t}|d|js4dt d}}}fddt j j D}tj j jd^}|t|fddt j j D} | D]"\} } } || 7}|| 7}|| 7}qWdn1s0YzWn<tyB} z"jd | jd WYd} ~ n d} ~ 00zWn<ty} z"jd | jd WYd} ~ n d} ~ 00js0}|r$d z j|WnRty"} z8d jd| d| WYd} ~ WdSd} ~ 00jt}|d||d|||d||d|d}|rd}||}d|Wn@ty} z&jdjd| WYd} ~ n d} ~ 00dS)zIndividually handle each archive or directory we've discovered by running through each file therein. Positional arguments: :param archive str: Filepath to the directory or archive start_timezBeginning obfuscation...rcsg|]}qSrrrirrrrrz/SoSCleaner.obfuscate_report..) max_workersZ initializercsg|]}|djjqSr)r>r&r) file_listr[rrrrNz!Failed to obfuscate directories: rdzFailed to obfuscate symlinks: zRe-compressing...zArchive z failed to compress: zFailed to re-compress archive: end_timeZrun_timeZfiles_obfuscatedZtotal_substitutionsr!z! [removed %s unprocessable files]zObfuscation completedzException while processing r) rOrN archive_namerZnow add_fieldZ is_extractedextractZ report_msglistZ get_filesranger>r&rZload_parser_entriesmaprobfuscate_directory_namesrvrVobfuscate_symlinksrZget_compressionZrename_top_dirrcompressrgrrrCrh)r[rZarc_mdrZfiles_obfuscated_countZtotal_sub_countZremoved_file_countZ archive_listexecutorZfuturesZfocZtscZrfcr|methodrZrmsgr)rrr[rrs       (        zSoSCleaner.obfuscate_reportcCs|j|gdSr)rr)r[rrrrrszSoSCleaner.obfuscate_filec s|jd|jd|D]}z||jddfdd|jD}|sb|ddWq|jd |jdt |}tj |j| }| |}||ks||krt |t||Wqty}z"|d |d |WYd }~qd }~00qd S) aIterate over symlinks in the archive and obfuscate their names. The content of the link target will have already been cleaned, and this second pass over just the names of the links is to ensure we avoid a possible race condition dependent on the order in which the link or the target get obfuscated. :param archive: The archive being obfuscated :type archive: ``SoSObfuscationArchive`` zObfuscating symlink namesrr9rcs(g|] }tfdd|jDs|qS)c3s|]}|VqdSr)match)rZ_skipZ_symrr *rz;SoSCleaner.obfuscate_symlinks...)anyZ skip_patterns)rZ_prrrr(sz1SoSCleaner.obfuscate_symlinks..z Skipping obfuscation of symlink z due to skip pattern matchzObfuscating symlink zError obfuscating symlink 'rtN)rVrZ get_symlinksrTrrrQrgrDreadlinkrFrGrrXsymlinkrv)r[rrZ_parsers_targetZ _ob_sym_nameZ _ob_targetr|rrrrs2         zSoSCleaner.obfuscate_symlinkscCs|d|jt|ddD]~}t|D]n}tj||}||j d}tj |r0| |}||kr0| |}tj|j | d|}t||q0q"dS)zFor all directories that exist within the archive, obfuscate the directory name if it contains sensitive strings found during execution z'Obfuscating directory names in archive T)reverserrN)rVrrZget_directory_listrDlistdirrFrGrTrrurrrrename)r[rdirpath_nameZ_dirnameZ_arc_dirZ _ob_dirnameZ _ob_arc_dirrrrrEs"   z$SoSCleaner.obfuscate_directory_namesc CsT|jD]H}z||}WqtyL}z|d|WYd}~qd}~00q|S)NzError obfuscating string data: )rQZparse_string_for_keysrvrV)r[Z string_datar8r|rrrr\s  (zSoSCleaner.obfuscate_stringcCsL|jd}|jD]4}||jdd}|dt|jj qdS)zLWrite some cleaner-level, non-report-specific stats to the manifest rQ rentriesN) rOrNrQrRrrSrrrZdatasetkeys)r[Z parse_secr8Z_secrrrrds  z"SoSCleaner.write_stats_to_manifest)NNNFN)N)N)N)N)F)(__name__ __module__ __qualname__rlZdescZ arg_defaultsr<rergrVrj classmethodrorIrrrrrHrrrrrrrrrrrrrrrrrrrr __classcell__rrr_rr-sf'N      1 a    , i.r),rryrArDrrconcurrent.futuresrrpwdrZsos.cleaner.preppersr3rZ sos.componentrZsos.cleaner.parsers.ip_parserrZsos.cleaner.parsers.mac_parserrZ#sos.cleaner.parsers.hostname_parserr Z"sos.cleaner.parsers.keyword_parserr Z#sos.cleaner.parsers.username_parserr Zsos.cleaner.parsers.ipv6_parserr Zsos.cleaner.archives.sosr rrrZsos.cleaner.archives.genericrrZsos.cleaner.archives.insightsrZ sos.utilitiesrrrrrrrrr s.            archives/__pycache__/__init__.cpython-39.opt-1.pyc000064400000043375151116317160015725 0ustar00a \hX@sjddlZddlZddlZddlZddlZddlZddlZddlmZddl m Z ddZ GdddZ dS)N)ProcessPoolExecutor)file_is_binaryc Cst|}tj|d}ttddd|_|D]d}tj||j}tj |}tj |}tj ||g}||krt d|d|| ||q4tj||j dd d d WdS1s0YdS) NZcleanerZfully_trusted_filtercSs|SN)memberpathrrA/usr/lib/python3.9/site-packages/sos/cleaner/archives/__init__.py"z!extract_archive..z"Attempted path traversal in tarflez != /.tarr)tarfileopenosrjoingetattrZextraction_filterZ getmembersnameabspath commonprefix Exceptionextractsplit) archive_pathtmpdirarchiverrZ member_pathZ abs_directoryZ abs_targetprefixrrrextract_archives$    rc@speZdZdZdZdZdZdZdZdZ iZ ddZ ddZ d d Z d d Zd dZdPddZddZeddZeddZeddZddZddZddZd d!ZdQd"d#ZdRd$d%ZdSd&d'ZdTd(d)Zd*d+Zed,d-Z d.d/Z!d0d1Z"d2d3Z#dUd4d5Z$d6d7Z%d8d9Z&d:d;Z'dd?Z)d@dAZ*dBdCZ+dDdEZ,dFdGZ-dHdIZ.dJdKZ/dLdMZ0dNdOZ1dS)VSoSObfuscationArchiveaA representation of an extracted archive or an sos archive build directory which is used by SoSCleaner. Each archive that needs to be obfuscated is loaded into an instance of this class. All report-level operations should be contained within this class. rZ undeterminedFcCs||_|j|_||_|jdddd|_|j|_td|_td|_ | |_ d|_ | d|_||_d |_|d |jd |jdS) Nr r r rsosZsos_uiFrzLoaded z as type )rfinal_archive_pathrr archive_nameui_nameloggingZ getLoggersoslogui_log_load_skip_list skip_list is_extracted _load_self archive_rootkeep_binary_filesparserslog_info description)selfrrr,rrr__init__Bs    zSoSObfuscationArchive.__init__c CsT|jD]H}z||}WqtyL}z|d|WYd}~qd}~00q|S)NzError obfuscating string data: )r-Zparse_string_for_keysrr.)r0Z string_dataparsererrrrrobfuscate_stringTs  (z&SoSObfuscationArchive.obfuscate_stringcCs||dd}||dd|}||kr||d}tj||}tj|sht||n&|t|}t |t ||dS)Nr r r) r4rreplacerrrislinkrenamereadlinkremovesymlink)r0 short_namefilenameZ_ob_short_nameZ _ob_filenamearc_pathZ_ob_pathZ _target_obrrrobfuscate_filename^s  z(SoSObfuscationArchive.obfuscate_filenamecCs ||_dSr)r-)r0r-rrr set_parsersusz!SoSObfuscationArchive.set_parserscCs|jD] }|qdSr)r-Zload_map_entries)r0r2rrrload_parser_entriesxs z)SoSObfuscationArchive.load_parser_entriesNc Csd}|s||fS|dur"|j}|D]X}z||\}}||7}Wq&ty|}z |d||jWYd}~q&d}~00q&||fS)aRun a line through each of the obfuscation parsers, keeping a cumulative total of substitutions done on that particular line. Positional arguments: :param line str: The raw line as read from the file being processed :param parsers: A list of parser objects to obfuscate with. If None, use all. Returns the fully obfuscated line and the number of substitutions made rNzfailed to parse line: )stripr-Z parse_liner log_debugr)r0liner-countr2_countr3rrrobfuscate_line|s ,z$SoSObfuscationArchive.obfuscate_linec sB|D]*}|dtd|z||jdd|rJWq|jsh|rh|Wq|jrt |s|rWqtj |rWqfdd|j D}|s|dp|dWq|d p|d }t jd |jd }t|d ddd}|D]j}z&|||\}}||7}||Wn<tyl} z"|dd| WYd} ~ n d} ~ 00qWdn1s0Y|d |rt|j|||Wdn1s0Y||Wqty.} z,|dtd|d| WYd} ~ qd} ~ 00q|j|j|jfS)Nz pid=z: obfuscating r cs(g|] }tfdd|jDs|qS)c3s|]}|VqdSr)match).0_skipr;rr szGSoSObfuscationArchive.obfuscate_arc_files...)anyZ skip_patterns)rIZ_prKrr s  z=SoSObfuscationArchive.obfuscate_arc_files..zSkipping obfuscation of z" due to matching file skip patternz Obfuscating rw)modedirrutf-8r5)encodingerrorszUnable to obfuscate : z': caught exception on obfuscating file )rBrgetpidrr"should_skip_filer,should_remove_file remove_filerrr6r-tempfileZNamedTemporaryFilerrrFwriterseekshutilcopyfilerupdate_sub_countr>files_obfuscated_counttotal_sub_countremoved_file_count) r0flistr<Z_parsersZsubsZtfilefnamerCZcntr3rrKrobfuscate_arc_filessx     B * z)SoSObfuscationArchive.obfuscate_arc_filescCstdS)z=Check if the archive is a well-known type we directly supportN)NotImplementedError)clsr=rrr check_is_typesz#SoSObfuscationArchive.check_is_typecCsd|jjvS)Nr) __class____name__lowerr0rrris_sosszSoSObfuscationArchive.is_soscCs d|jvS)NZinsights) type_namermrrr is_insightssz!SoSObfuscationArchive.is_insightscCs|jrt|j|_dSr) is_tarfilerrrtarobjrmrrrr*sz SoSObfuscationArchive._load_selfcCsgS)a/Return a list of ObfuscationArchives that represent additional archives found within the target archive. For example, an archive from `sos collect` will return a list of ``SoSReportArchive`` objects. This should be overridden by individual types of ObfuscationArchive's rrmrrrget_nested_archivessz)SoSObfuscationArchive.get_nested_archivescCs>|jr0|jj}|r|jStj|jp.tjStj |j S)z|Set the root path for the archive that should be prepended to any filenames given to methods in this class. ) rqrrZ firstmemberisdirrrrdirnameseprr)r0Ztoplevelrrrget_archive_roots z&SoSObfuscationArchive.get_archive_rootcCs"|j|jddd|dS)z9Helper to easily format ui messages on a per-report basisz :z<50 N)r&infor#)r0msgrrr report_msgsz SoSObfuscationArchive.report_msgcCs&d|rd|ndd|jd|S)Nz[cleaner:r [z]] )r"r0rzZcallerrrr _fmt_log_msgs z"SoSObfuscationArchive._fmt_log_msgcCs|j|||dSr)r%debugrr~rrrrBszSoSObfuscationArchive.log_debugcCs|j|||dSr)r%ryrr~rrrr. szSoSObfuscationArchive.log_infocCs|j|||dSr)r%errorrr~rrr log_error szSoSObfuscationArchive.log_errorcCsgdS)zyProvide a list of files and file regexes to skip obfuscation on Returns: list of files and file regexes )z proc/kallsymsz sosreport-z sys/firmwarezsys/fszsys/kernel/debugz sys/modulerrmrrrr'sz%SoSObfuscationArchive._load_skip_listcCs(zt|jWSty"YdS0dS)NF)rrqrrrmrrrrqs z SoSObfuscationArchive.is_tarfilecCs<||}|r8|d|dt||jd7_dS)zRemove a file from the archive. This is used when cleaner encounters a binary file, which we cannot reliably obfuscate. zRemoving binary file 'z' from archiverGN) get_file_pathr.rr9rc)r0reZ full_fnamerrrrZ$s   z!SoSObfuscationArchive.remove_filecCs6|js&|js||_tj|j|Stj|j|S)zBased on the type of archive we're dealing with, do whatever that archive requires to a provided **relative** filepath to be able to access it within the archive )r)r+rwrrrextracted_path)r0rerrrformat_file_name/s  z&SoSObfuscationArchive.format_file_namec Cs|jdur\|jr\||}z|j|dWStyX|d|dYdS0nzBt ||ddd}|WdWS1s0YWn<t y}z$|d |d |WYd}~dSd}~00dS) zReturn the content from the specified fname. Particularly useful for tarball-type archives so we can retrieve prep file contents prior to extracting the entire archive FrSzUnable to retrieve z: no such file in archiver rR)rTNzFailed to get contents of rV) r)rqrrrZ extractfilereaddecodeKeyErrorrBrr)r0rer<Zto_readr3rrrget_file_content:s$     ,z&SoSObfuscationArchive.get_file_contentc Csj|jr,|s|d||_d|_d|_n|j|_tdkrT| dt |jD]\}}}z|D]0}tj ||}t |j}t||t jBqh|D]x}tj ||} tj | rtj | rqt| tjrt| tjs| d| |jdt| t jt jBqWqXtyP} z| d| WYd} ~ qXd} ~ 00qX| d|jdS) Nz Extracting...Trz)Verifying permissions of archive contentszAdding owner rw permissions to r z!Error while trying to set perms: zExtracted path is )rqr{ extract_selfrr)rrrrgetuidrBwalkrrstatst_modechmodS_IRWXUexistsr6accessR_OKW_OKrS_IRUSRS_IWUSRr) r0quietrudirsfiles_dirZ_dirnameZ _dir_permsr<rer3rrrrQs>     (zSoSObfuscationArchive.extractcCs.|j|j|}||_t|j|||_dS)zRename the top-level directory to new_name, which should be an obfuscated string that scrubs the hostname from the top-level dir which would be named after the unobfuscated sos report N)rr5r"rr7)r0new_name_pathrrrrename_top_dirwsz$SoSObfuscationArchive.rename_top_dircCs|jr|jdrdSdSdS)zReturn the compression type used by the archive, if any. This is then used by SoSCleaner to generate a policy-derived compression command to repack the archive xzgzN)rqrendswithrmrrrget_compressions  z%SoSObfuscationArchive.get_compressioncCsd}|jd}i}|rL|d|7}|d|7}|dkrDddi}ndd i}|d |tj|fd |i|.}|j|jtj|jd d Wdn1s0Y|S)zIPack the extracted archive as a tarfile to then be re-compressed rOz-obfuscated.tarr|.rpreset compresslevelzBuilding tar file rPrG)ZarcnameN) rrBrraddrrrr")r0methodrPZtarpathZ compr_argstarrrrbuild_tar_files  $z$SoSObfuscationArchive.build_tar_filec Csz|||_Wn6tyF}z|d|WYd}~n d}~00|d|jz |Wn>ty}z&|d||dWYd}~n d}~00dS)zExecute the compression command, and set the appropriate final archive path for later reference by SoSCleaner on a per-archive basis z(Exception while re-compressing archive: NzCompressed to z'Failed to remove extraction directory: z/Failed to remove temporary extraction directory)rr!rrBremove_extracted_pathr{)r0rr3rrrcompresss zSoSObfuscationArchive.compresscCsrz"|d|jt|jWnJtylt|jtjtj |jr\t |jn t|jYn0dS)zAfter the tarball has been re-compressed, remove the extracted path so that we don't take up that duplicate space any longer during execution z Removing N) rBrr^rmtreeOSErrorrrrrrisfiler9rmrrrrs z+SoSObfuscationArchive.remove_extracted_pathcCsJtd.}|t|j|j}|}|WdS1s<0YdS)z}Extract an archive into our tmpdir so that we may inspect it or iterate through its contents for obfuscation rGN)rZsubmitrrrresult)r0Z_poolZ _path_futurerrrrrs  z"SoSObfuscationArchive.extract_selfccspt|jD]^\}}}|D]$}tj||}tj|r|Vq|D]$}tj||}tj|rD|VqDq dS)z.Iterator for a list of symlinks in the archiveN)rrrrrr6)r0rurrrZ_dirpathr<_fnamerrr get_symlinkss  z"SoSObfuscationArchive.get_symlinksccsLt|jD]:\}}}|D]*}tj||d}tj|s|Vqq dS)zIterator for a list of files in the archive, to allow clean to iterate over. Will not include symlinks, as those are handled separately r N)rrrrrlstripr6)r0ru_rr<rrrr get_filess  zSoSObfuscationArchive.get_filescCs*g}t|jD]\}}}||q|S)z3Return a list of all directories within the archive)rrrappend)r0Zdir_listrurrrrget_directory_lists z(SoSObfuscationArchive.get_directory_listcCs |jd7_|j|7_dS)zCalled when a file has finished being parsed and used to track total substitutions made and number of files that had changes made rGN)rarb)r0rDrrrr`sz&SoSObfuscationArchive.update_sub_countcCs*tj|j|d}tj|r&|SdS)zReturn the filepath of a specific file within the archive so that it may be selectively inspected if it exists r r )rrrrrr)r0rerrrrrsz#SoSObfuscationArchive.get_file_pathcCsTtj||s(tj||s(dS|jD] }||sHt||r.dSq.dS)a%Checks the provided filename against a list of filepaths to not perform obfuscation on, as defined in self.skip_list Positional arguments: :param filename str: Filename relative to the extracted archive root TF) rrrrr6r( startswithrerH)r0r<rJrrrrXs  z&SoSObfuscationArchive.should_skip_filecCsFgd}|D]}t||r dSq ||}tj|rBt|SdS)aaDetermine if the file should be removed or not, due to an inability to reliably obfuscate that file based on the filename. :param fname: Filename relative to the extracted archive root :type fname: ``str`` :returns: ``True`` if the file cannot be reliably obfuscated :rtype: ``bool`` ) z.*\.gz$z.*\.xz$z .*\.bzip2$z .*\.tar\..*z.*\.txz$z.*\.tgz$z.*\.bin$z .*\.journal$z.*\~$TF)rrHrrrrr)r0reZobvious_removesZ_arc_regZ _full_pathrrrrY s    z(SoSObfuscationArchive.should_remove_file)N)N)N)N)N)F)2rk __module__ __qualname____doc__rarbrcror/Z is_nestedZ prep_filesr1r4r>r?r@rFrf classmethodripropertyrnrpr*rsrwr{rrBr.rr'rqrZrrrrrrrrrrrrr`rrXrYrrrrr2sb  @             &     r) r$rr^rrr[rconcurrent.futuresrZ sos.utilitiesrrrrrrr s  archives/__pycache__/__init__.cpython-39.pyc000064400000043375151116317160014766 0ustar00a \hX@sjddlZddlZddlZddlZddlZddlZddlZddlmZddl m Z ddZ GdddZ dS)N)ProcessPoolExecutor)file_is_binaryc Cst|}tj|d}ttddd|_|D]d}tj||j}tj |}tj |}tj ||g}||krt d|d|| ||q4tj||j dd d d WdS1s0YdS) NZcleanerZfully_trusted_filtercSs|SN)memberpathrrA/usr/lib/python3.9/site-packages/sos/cleaner/archives/__init__.py"z!extract_archive..z"Attempted path traversal in tarflez != /.tarr)tarfileopenosrjoingetattrZextraction_filterZ getmembersnameabspath commonprefix Exceptionextractsplit) archive_pathtmpdirarchiverrZ member_pathZ abs_directoryZ abs_targetprefixrrrextract_archives$    rc@speZdZdZdZdZdZdZdZdZ iZ ddZ ddZ d d Z d d Zd dZdPddZddZeddZeddZeddZddZddZddZd d!ZdQd"d#ZdRd$d%ZdSd&d'ZdTd(d)Zd*d+Zed,d-Z d.d/Z!d0d1Z"d2d3Z#dUd4d5Z$d6d7Z%d8d9Z&d:d;Z'dd?Z)d@dAZ*dBdCZ+dDdEZ,dFdGZ-dHdIZ.dJdKZ/dLdMZ0dNdOZ1dS)VSoSObfuscationArchiveaA representation of an extracted archive or an sos archive build directory which is used by SoSCleaner. Each archive that needs to be obfuscated is loaded into an instance of this class. All report-level operations should be contained within this class. rZ undeterminedFcCs||_|j|_||_|jdddd|_|j|_td|_td|_ | |_ d|_ | d|_||_d |_|d |jd |jdS) Nr r r rsosZsos_uiFrzLoaded z as type )rfinal_archive_pathrr archive_nameui_nameloggingZ getLoggersoslogui_log_load_skip_list skip_list is_extracted _load_self archive_rootkeep_binary_filesparserslog_info description)selfrrr,rrr__init__Bs    zSoSObfuscationArchive.__init__c CsT|jD]H}z||}WqtyL}z|d|WYd}~qd}~00q|S)NzError obfuscating string data: )r-Zparse_string_for_keysrr.)r0Z string_dataparsererrrrrobfuscate_stringTs  (z&SoSObfuscationArchive.obfuscate_stringcCs||dd}||dd|}||kr||d}tj||}tj|sht||n&|t|}t |t ||dS)Nr r r) r4rreplacerrrislinkrenamereadlinkremovesymlink)r0 short_namefilenameZ_ob_short_nameZ _ob_filenamearc_pathZ_ob_pathZ _target_obrrrobfuscate_filename^s  z(SoSObfuscationArchive.obfuscate_filenamecCs ||_dSr)r-)r0r-rrr set_parsersusz!SoSObfuscationArchive.set_parserscCs|jD] }|qdSr)r-Zload_map_entries)r0r2rrrload_parser_entriesxs z)SoSObfuscationArchive.load_parser_entriesNc Csd}|s||fS|dur"|j}|D]X}z||\}}||7}Wq&ty|}z |d||jWYd}~q&d}~00q&||fS)aRun a line through each of the obfuscation parsers, keeping a cumulative total of substitutions done on that particular line. Positional arguments: :param line str: The raw line as read from the file being processed :param parsers: A list of parser objects to obfuscate with. If None, use all. Returns the fully obfuscated line and the number of substitutions made rNzfailed to parse line: )stripr-Z parse_liner log_debugr)r0liner-countr2_countr3rrrobfuscate_line|s ,z$SoSObfuscationArchive.obfuscate_linec sB|D]*}|dtd|z||jdd|rJWq|jsh|rh|Wq|jrt |s|rWqtj |rWqfdd|j D}|s|dp|dWq|d p|d }t jd |jd }t|d ddd}|D]j}z&|||\}}||7}||Wn<tyl} z"|dd| WYd} ~ n d} ~ 00qWdn1s0Y|d |rt|j|||Wdn1s0Y||Wqty.} z,|dtd|d| WYd} ~ qd} ~ 00q|j|j|jfS)Nz pid=z: obfuscating r cs(g|] }tfdd|jDs|qS)c3s|]}|VqdSr)match).0_skipr;rr szGSoSObfuscationArchive.obfuscate_arc_files...)anyZ skip_patterns)rIZ_prKrr s  z=SoSObfuscationArchive.obfuscate_arc_files..zSkipping obfuscation of z" due to matching file skip patternz Obfuscating rw)modedirrutf-8r5)encodingerrorszUnable to obfuscate : z': caught exception on obfuscating file )rBrgetpidrr"should_skip_filer,should_remove_file remove_filerrr6r-tempfileZNamedTemporaryFilerrrFwriterseekshutilcopyfilerupdate_sub_countr>files_obfuscated_counttotal_sub_countremoved_file_count) r0flistr<Z_parsersZsubsZtfilefnamerCZcntr3rrKrobfuscate_arc_filessx     B * z)SoSObfuscationArchive.obfuscate_arc_filescCstdS)z=Check if the archive is a well-known type we directly supportN)NotImplementedError)clsr=rrr check_is_typesz#SoSObfuscationArchive.check_is_typecCsd|jjvS)Nr) __class____name__lowerr0rrris_sosszSoSObfuscationArchive.is_soscCs d|jvS)NZinsights) type_namermrrr is_insightssz!SoSObfuscationArchive.is_insightscCs|jrt|j|_dSr) is_tarfilerrrtarobjrmrrrr*sz SoSObfuscationArchive._load_selfcCsgS)a/Return a list of ObfuscationArchives that represent additional archives found within the target archive. For example, an archive from `sos collect` will return a list of ``SoSReportArchive`` objects. This should be overridden by individual types of ObfuscationArchive's rrmrrrget_nested_archivessz)SoSObfuscationArchive.get_nested_archivescCs>|jr0|jj}|r|jStj|jp.tjStj |j S)z|Set the root path for the archive that should be prepended to any filenames given to methods in this class. ) rqrrZ firstmemberisdirrrrdirnameseprr)r0Ztoplevelrrrget_archive_roots z&SoSObfuscationArchive.get_archive_rootcCs"|j|jddd|dS)z9Helper to easily format ui messages on a per-report basisz :z<50 N)r&infor#)r0msgrrr report_msgsz SoSObfuscationArchive.report_msgcCs&d|rd|ndd|jd|S)Nz[cleaner:r [z]] )r"r0rzZcallerrrr _fmt_log_msgs z"SoSObfuscationArchive._fmt_log_msgcCs|j|||dSr)r%debugrr~rrrrBszSoSObfuscationArchive.log_debugcCs|j|||dSr)r%ryrr~rrrr. szSoSObfuscationArchive.log_infocCs|j|||dSr)r%errorrr~rrr log_error szSoSObfuscationArchive.log_errorcCsgdS)zyProvide a list of files and file regexes to skip obfuscation on Returns: list of files and file regexes )z proc/kallsymsz sosreport-z sys/firmwarezsys/fszsys/kernel/debugz sys/modulerrmrrrr'sz%SoSObfuscationArchive._load_skip_listcCs(zt|jWSty"YdS0dS)NF)rrqrrrmrrrrqs z SoSObfuscationArchive.is_tarfilecCs<||}|r8|d|dt||jd7_dS)zRemove a file from the archive. This is used when cleaner encounters a binary file, which we cannot reliably obfuscate. zRemoving binary file 'z' from archiverGN) get_file_pathr.rr9rc)r0reZ full_fnamerrrrZ$s   z!SoSObfuscationArchive.remove_filecCs6|js&|js||_tj|j|Stj|j|S)zBased on the type of archive we're dealing with, do whatever that archive requires to a provided **relative** filepath to be able to access it within the archive )r)r+rwrrrextracted_path)r0rerrrformat_file_name/s  z&SoSObfuscationArchive.format_file_namec Cs|jdur\|jr\||}z|j|dWStyX|d|dYdS0nzBt ||ddd}|WdWS1s0YWn<t y}z$|d |d |WYd}~dSd}~00dS) zReturn the content from the specified fname. Particularly useful for tarball-type archives so we can retrieve prep file contents prior to extracting the entire archive FrSzUnable to retrieve z: no such file in archiver rR)rTNzFailed to get contents of rV) r)rqrrrZ extractfilereaddecodeKeyErrorrBrr)r0rer<Zto_readr3rrrget_file_content:s$     ,z&SoSObfuscationArchive.get_file_contentc Csj|jr,|s|d||_d|_d|_n|j|_tdkrT| dt |jD]\}}}z|D]0}tj ||}t |j}t||t jBqh|D]x}tj ||} tj | rtj | rqt| tjrt| tjs| d| |jdt| t jt jBqWqXtyP} z| d| WYd} ~ qXd} ~ 00qX| d|jdS) Nz Extracting...Trz)Verifying permissions of archive contentszAdding owner rw permissions to r z!Error while trying to set perms: zExtracted path is )rqr{ extract_selfrr)rrrrgetuidrBwalkrrstatst_modechmodS_IRWXUexistsr6accessR_OKW_OKrS_IRUSRS_IWUSRr) r0quietrudirsfiles_dirZ_dirnameZ _dir_permsr<rer3rrrrQs>     (zSoSObfuscationArchive.extractcCs.|j|j|}||_t|j|||_dS)zRename the top-level directory to new_name, which should be an obfuscated string that scrubs the hostname from the top-level dir which would be named after the unobfuscated sos report N)rr5r"rr7)r0new_name_pathrrrrename_top_dirwsz$SoSObfuscationArchive.rename_top_dircCs|jr|jdrdSdSdS)zReturn the compression type used by the archive, if any. This is then used by SoSCleaner to generate a policy-derived compression command to repack the archive xzgzN)rqrendswithrmrrrget_compressions  z%SoSObfuscationArchive.get_compressioncCsd}|jd}i}|rL|d|7}|d|7}|dkrDddi}ndd i}|d |tj|fd |i|.}|j|jtj|jd d Wdn1s0Y|S)zIPack the extracted archive as a tarfile to then be re-compressed rOz-obfuscated.tarr|.rpreset compresslevelzBuilding tar file rPrG)ZarcnameN) rrBrraddrrrr")r0methodrPZtarpathZ compr_argstarrrrbuild_tar_files  $z$SoSObfuscationArchive.build_tar_filec Csz|||_Wn6tyF}z|d|WYd}~n d}~00|d|jz |Wn>ty}z&|d||dWYd}~n d}~00dS)zExecute the compression command, and set the appropriate final archive path for later reference by SoSCleaner on a per-archive basis z(Exception while re-compressing archive: NzCompressed to z'Failed to remove extraction directory: z/Failed to remove temporary extraction directory)rr!rrBremove_extracted_pathr{)r0rr3rrrcompresss zSoSObfuscationArchive.compresscCsrz"|d|jt|jWnJtylt|jtjtj |jr\t |jn t|jYn0dS)zAfter the tarball has been re-compressed, remove the extracted path so that we don't take up that duplicate space any longer during execution z Removing N) rBrr^rmtreeOSErrorrrrrrisfiler9rmrrrrs z+SoSObfuscationArchive.remove_extracted_pathcCsJtd.}|t|j|j}|}|WdS1s<0YdS)z}Extract an archive into our tmpdir so that we may inspect it or iterate through its contents for obfuscation rGN)rZsubmitrrrresult)r0Z_poolZ _path_futurerrrrrs  z"SoSObfuscationArchive.extract_selfccspt|jD]^\}}}|D]$}tj||}tj|r|Vq|D]$}tj||}tj|rD|VqDq dS)z.Iterator for a list of symlinks in the archiveN)rrrrrr6)r0rurrrZ_dirpathr<_fnamerrr get_symlinkss  z"SoSObfuscationArchive.get_symlinksccsLt|jD]:\}}}|D]*}tj||d}tj|s|Vqq dS)zIterator for a list of files in the archive, to allow clean to iterate over. Will not include symlinks, as those are handled separately r N)rrrrrlstripr6)r0ru_rr<rrrr get_filess  zSoSObfuscationArchive.get_filescCs*g}t|jD]\}}}||q|S)z3Return a list of all directories within the archive)rrrappend)r0Zdir_listrurrrrget_directory_lists z(SoSObfuscationArchive.get_directory_listcCs |jd7_|j|7_dS)zCalled when a file has finished being parsed and used to track total substitutions made and number of files that had changes made rGN)rarb)r0rDrrrr`sz&SoSObfuscationArchive.update_sub_countcCs*tj|j|d}tj|r&|SdS)zReturn the filepath of a specific file within the archive so that it may be selectively inspected if it exists r r )rrrrrr)r0rerrrrrsz#SoSObfuscationArchive.get_file_pathcCsTtj||s(tj||s(dS|jD] }||sHt||r.dSq.dS)a%Checks the provided filename against a list of filepaths to not perform obfuscation on, as defined in self.skip_list Positional arguments: :param filename str: Filename relative to the extracted archive root TF) rrrrr6r( startswithrerH)r0r<rJrrrrXs  z&SoSObfuscationArchive.should_skip_filecCsFgd}|D]}t||r dSq ||}tj|rBt|SdS)aaDetermine if the file should be removed or not, due to an inability to reliably obfuscate that file based on the filename. :param fname: Filename relative to the extracted archive root :type fname: ``str`` :returns: ``True`` if the file cannot be reliably obfuscated :rtype: ``bool`` ) z.*\.gz$z.*\.xz$z .*\.bzip2$z .*\.tar\..*z.*\.txz$z.*\.tgz$z.*\.bin$z .*\.journal$z.*\~$TF)rrHrrrrr)r0reZobvious_removesZ_arc_regZ _full_pathrrrrY s    z(SoSObfuscationArchive.should_remove_file)N)N)N)N)N)F)2rk __module__ __qualname____doc__rarbrcror/Z is_nestedZ prep_filesr1r4r>r?r@rFrf classmethodripropertyrnrpr*rsrwr{rrBr.rr'rqrZrrrrrrrrrrrrr`rrXrYrrrrr2sb  @             &     r) r$rr^rrr[rconcurrent.futuresrZ sos.utilitiesrrrrrrr s  archives/__pycache__/generic.cpython-39.opt-1.pyc000064400000003153151116317160015570 0ustar00a \h@s@ddlZddlZddlmZGdddeZGdddeZdS)N)SoSObfuscationArchivec@s,eZdZdZdZdZeddZddZdS) DataDirArchivez{A plain directory on the filesystem that is not directly associated with any known or supported collection utility Zdata_dirzunassociated directorycCs tj|SN)ospathisdirclsZarc_pathr @/usr/lib/python3.9/site-packages/sos/cleaner/archives/generic.py check_is_typeszDataDirArchive.check_is_typecCstj|jSr)rrabspathZ archive_pathselfr r r set_archive_rootszDataDirArchive.set_archive_rootN __name__ __module__ __qualname____doc__ type_name description classmethodr rr r r r rs  rc@s,eZdZdZdZdZeddZddZdS) TarballArchivezdA generic tar archive that is not associated with any known or supported collection utility Ztarballzunassociated tarballcCs&z t|WSty YdS0dS)NF)tarfileZ is_tarfile Exceptionrr r r r )s  zTarballArchive.check_is_typecCs|jjr|jjjSdS)N)tarobjZ firstmemberrnamerr r r r0s  zTarballArchive.set_archive_rootNrr r r r r!s  r)rrZsos.cleaner.archivesrrrr r r r  s archives/__pycache__/generic.cpython-39.pyc000064400000003153151116317160014631 0ustar00a \h@s@ddlZddlZddlmZGdddeZGdddeZdS)N)SoSObfuscationArchivec@s,eZdZdZdZdZeddZddZdS) DataDirArchivez{A plain directory on the filesystem that is not directly associated with any known or supported collection utility Zdata_dirzunassociated directorycCs tj|SN)ospathisdirclsZarc_pathr @/usr/lib/python3.9/site-packages/sos/cleaner/archives/generic.py check_is_typeszDataDirArchive.check_is_typecCstj|jSr)rrabspathZ archive_pathselfr r r set_archive_rootszDataDirArchive.set_archive_rootN __name__ __module__ __qualname____doc__ type_name description classmethodr rr r r r rs  rc@s,eZdZdZdZdZeddZddZdS) TarballArchivezdA generic tar archive that is not associated with any known or supported collection utility Ztarballzunassociated tarballcCs&z t|WSty YdS0dS)NF)tarfileZ is_tarfile Exceptionrr r r r )s  zTarballArchive.check_is_typecCs|jjr|jjjSdS)N)tarobjZ firstmemberrnamerr r r r0s  zTarballArchive.set_archive_rootNrr r r r r!s  r)rrZsos.cleaner.archivesrrrr r r r  s archives/__pycache__/insights.cpython-39.opt-1.pyc000064400000002130151116317160015776 0ustar00a \hO@s(ddlZddlmZGdddeZdS)N)SoSObfuscationArchivec@s,eZdZdZdZdZeddZddZdS) InsightsArchivezbThis class represents archives generated by the insights-client utility for RHEL systems. Zinsightszinsights-client archivecCs.zt|od|vWSty(YdS0dS)Nz insights-F)tarfileZ is_tarfile Exception)clsZarc_pathrA/usr/lib/python3.9/site-packages/sos/cleaner/archives/insights.py check_is_types zInsightsArchive.check_is_typecCs4|jdddd}|jjjdkr0d|}|S)N/z.tarr.z./)Z archive_pathsplittarobjZ firstmembername)selftoprrrget_archive_rootsz InsightsArchive.get_archive_rootN) __name__ __module__ __qualname____doc__ type_name description classmethodr rrrrrrs  r)rZsos.cleaner.archivesrrrrrr s archives/__pycache__/insights.cpython-39.pyc000064400000002130151116317160015037 0ustar00a \hO@s(ddlZddlmZGdddeZdS)N)SoSObfuscationArchivec@s,eZdZdZdZdZeddZddZdS) InsightsArchivezbThis class represents archives generated by the insights-client utility for RHEL systems. Zinsightszinsights-client archivecCs.zt|od|vWSty(YdS0dS)Nz insights-F)tarfileZ is_tarfile Exception)clsZarc_pathrA/usr/lib/python3.9/site-packages/sos/cleaner/archives/insights.py check_is_types zInsightsArchive.check_is_typecCs4|jdddd}|jjjdkr0d|}|S)N/z.tarr.z./)Z archive_pathsplittarobjZ firstmembername)selftoprrrget_archive_rootsz InsightsArchive.get_archive_rootN) __name__ __module__ __qualname____doc__ type_name description classmethodr rrrrrrs  r)rZsos.cleaner.archivesrrrrrr s archives/__pycache__/sos.cpython-39.opt-1.pyc000064400000005647151116317160014772 0ustar00a \h @s`ddlZddlZddlmZGdddeZGdddeZGdddeZGd d d eZdS) N)SoSObfuscationArchivec@s$eZdZdZdZdZeddZdS)SoSReportArchivezThis is the class representing an sos report, or in other words the type the archive the SoS project natively generates reportzsos report archivecCs.zt|od|vWSty(YdS0dS)N sosreport-Ftarfile is_tarfile Exceptionclsarc_pathr s  archives/__pycache__/sos.cpython-39.pyc000064400000005647151116317160014033 0ustar00a \h @s`ddlZddlZddlmZGdddeZGdddeZGdddeZGd d d eZdS) N)SoSObfuscationArchivec@s$eZdZdZdZdZeddZdS)SoSReportArchivezThis is the class representing an sos report, or in other words the type the archive the SoS project natively generates reportzsos report archivecCs.zt|od|vWSty(YdS0dS)N sosreport-Ftarfile is_tarfile Exceptionclsarc_pathr s  archives/__init__.py000064400000054224151116317160010472 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import logging import os import shutil import stat import tarfile import tempfile import re from concurrent.futures import ProcessPoolExecutor from sos.utilities import file_is_binary # python older than 3.8 will hit a pickling error when we go to spawn a new # process for extraction if this method is a part of the SoSObfuscationArchive # class. So, the simplest solution is to remove it from the class. def extract_archive(archive_path, tmpdir): with tarfile.open(archive_path) as archive: path = os.path.join(tmpdir, 'cleaner') # set extract filter since python 3.12 (see PEP-706 for more) # Because python 3.10 and 3.11 raises false alarms as exceptions # (see #3330 for examples), we can't use data filter but must # fully trust the archive (legacy behaviour) archive.extraction_filter = getattr(tarfile, 'fully_trusted_filter', (lambda member, path: member)) # Guard against "Arbitrary file write during tarfile extraction" # Checks the extracted files don't stray out of the target directory. for member in archive.getmembers(): member_path = os.path.join(path, member.name) abs_directory = os.path.abspath(path) abs_target = os.path.abspath(member_path) prefix = os.path.commonprefix([abs_directory, abs_target]) if prefix != abs_directory: raise Exception(f"Attempted path traversal in tarfle" f"{prefix} != {abs_directory}") archive.extract(member, path) return os.path.join(path, archive.name.split('/')[-1].split('.tar')[0]) class SoSObfuscationArchive(): """A representation of an extracted archive or an sos archive build directory which is used by SoSCleaner. Each archive that needs to be obfuscated is loaded into an instance of this class. All report-level operations should be contained within this class. """ files_obfuscated_count = 0 total_sub_count = 0 removed_file_count = 0 type_name = 'undetermined' description = 'undetermined' is_nested = False prep_files = {} def __init__(self, archive_path, tmpdir, keep_binary_files): self.archive_path = archive_path self.final_archive_path = self.archive_path self.tmpdir = tmpdir self.archive_name = self.archive_path.split('/')[-1].split('.tar')[0] self.ui_name = self.archive_name self.soslog = logging.getLogger('sos') self.ui_log = logging.getLogger('sos_ui') self.skip_list = self._load_skip_list() self.is_extracted = False self._load_self() self.archive_root = '' self.keep_binary_files = keep_binary_files self.parsers = () self.log_info( f"Loaded {self.archive_path} as type {self.description}" ) def obfuscate_string(self, string_data): for parser in self.parsers: try: string_data = parser.parse_string_for_keys(string_data) except Exception as err: self.log_info(f"Error obfuscating string data: {err}") return string_data # TODO: merge content to obfuscate_arc_files as that is the only place we # call obfuscate_filename ? def obfuscate_filename(self, short_name, filename): _ob_short_name = self.obfuscate_string(short_name.split('/')[-1]) _ob_filename = short_name.replace(short_name.split('/')[-1], _ob_short_name) if _ob_filename != short_name: arc_path = filename.split(short_name)[0] _ob_path = os.path.join(arc_path, _ob_filename) # ensure that any plugin subdirs that contain obfuscated strings # get created with obfuscated counterparts if not os.path.islink(filename): os.rename(filename, _ob_path) else: # generate the obfuscated name of the link target _target_ob = self.obfuscate_string(os.readlink(filename)) # remove the unobfuscated original symlink first, in case the # symlink name hasn't changed but the target has os.remove(filename) # create the newly obfuscated symlink, pointing to the # obfuscated target name, which may not exist just yet, but # when the actual file is obfuscated, will be created os.symlink(_target_ob, _ob_path) def set_parsers(self, parsers): self.parsers = parsers # TODO: include this in __init__? def load_parser_entries(self): for parser in self.parsers: parser.load_map_entries() def obfuscate_line(self, line, parsers=None): """Run a line through each of the obfuscation parsers, keeping a cumulative total of substitutions done on that particular line. Positional arguments: :param line str: The raw line as read from the file being processed :param parsers: A list of parser objects to obfuscate with. If None, use all. Returns the fully obfuscated line and the number of substitutions made """ # don't iterate over blank lines, but still write them to the tempfile # to maintain the same structure when we write a scrubbed file back count = 0 if not line.strip(): return line, count if parsers is None: parsers = self.parsers for parser in parsers: try: line, _count = parser.parse_line(line) count += _count except Exception as err: self.log_debug(f"failed to parse line: {err}", parser.name) return line, count def obfuscate_arc_files(self, flist): for filename in flist: self.log_debug(f" pid={os.getpid()}: obfuscating {filename}") try: short_name = filename.split(self.archive_name + '/')[1] if self.should_skip_file(short_name): continue if (not self.keep_binary_files and self.should_remove_file(short_name)): # We reach this case if the option --keep-binary-files # was not used, and the file is in a list to be removed self.remove_file(short_name) continue if (self.keep_binary_files and (file_is_binary(filename) or self.should_remove_file(short_name))): # We reach this case if the option --keep-binary-files # is used. In this case we want to make sure # the cleaner doesn't try to clean a binary file continue if os.path.islink(filename): # don't run the obfuscation on the link, but on the actual # file at some other point. continue _parsers = [ _p for _p in self.parsers if not any( _skip.match(short_name) for _skip in _p.skip_patterns ) ] if not _parsers: self.log_debug( f"Skipping obfuscation of {short_name or filename} " f"due to matching file skip pattern" ) continue self.log_debug(f"Obfuscating {short_name or filename}") subs = 0 with tempfile.NamedTemporaryFile(mode='w', dir=self.tmpdir) \ as tfile: with open(filename, 'r', encoding='utf-8', errors='replace') as fname: for line in fname: try: line, cnt = self.obfuscate_line(line, _parsers) subs += cnt tfile.write(line) except Exception as err: self.log_debug(f"Unable to obfuscate " f"{short_name}: {err}") tfile.seek(0) if subs: shutil.copyfile(tfile.name, filename) self.update_sub_count(subs) self.obfuscate_filename(short_name, filename) except Exception as err: self.log_debug(f" pid={os.getpid()}: caught exception on " f"obfuscating file {filename}: {err}") return (self.files_obfuscated_count, self.total_sub_count, self.removed_file_count) @classmethod def check_is_type(cls, arc_path): """Check if the archive is a well-known type we directly support""" raise NotImplementedError @property def is_sos(self): return 'sos' in self.__class__.__name__.lower() @property def is_insights(self): return 'insights' in self.type_name def _load_self(self): if self.is_tarfile: # pylint: disable=consider-using-with self.tarobj = tarfile.open(self.archive_path) def get_nested_archives(self): """Return a list of ObfuscationArchives that represent additional archives found within the target archive. For example, an archive from `sos collect` will return a list of ``SoSReportArchive`` objects. This should be overridden by individual types of ObfuscationArchive's """ return [] def get_archive_root(self): """Set the root path for the archive that should be prepended to any filenames given to methods in this class. """ if self.is_tarfile: toplevel = self.tarobj.firstmember if toplevel.isdir(): return toplevel.name return os.path.dirname(toplevel.name) or os.sep return os.path.abspath(self.archive_path) def report_msg(self, msg): """Helper to easily format ui messages on a per-report basis""" self.ui_log.info(f"{self.ui_name + ' :':<50} {msg}") def _fmt_log_msg(self, msg, caller=None): return f"[cleaner{f':{caller}' if caller else ''}" \ f"[{self.archive_name}]] {msg}" def log_debug(self, msg, caller=None): self.soslog.debug(self._fmt_log_msg(msg, caller)) def log_info(self, msg, caller=None): self.soslog.info(self._fmt_log_msg(msg, caller)) def log_error(self, msg, caller=None): self.soslog.error(self._fmt_log_msg(msg, caller)) def _load_skip_list(self): """Provide a list of files and file regexes to skip obfuscation on Returns: list of files and file regexes """ return [ 'proc/kallsyms', 'sosreport-', 'sys/firmware', 'sys/fs', 'sys/kernel/debug', 'sys/module' ] @property def is_tarfile(self): try: return tarfile.is_tarfile(self.archive_path) except Exception: return False def remove_file(self, fname): """Remove a file from the archive. This is used when cleaner encounters a binary file, which we cannot reliably obfuscate. """ full_fname = self.get_file_path(fname) # don't call a blank remove() here if full_fname: self.log_info(f"Removing binary file '{fname}' from archive") os.remove(full_fname) self.removed_file_count += 1 def format_file_name(self, fname): """Based on the type of archive we're dealing with, do whatever that archive requires to a provided **relative** filepath to be able to access it within the archive """ if not self.is_extracted: if not self.archive_root: self.archive_root = self.get_archive_root() return os.path.join(self.archive_root, fname) return os.path.join(self.extracted_path, fname) def get_file_content(self, fname): """Return the content from the specified fname. Particularly useful for tarball-type archives so we can retrieve prep file contents prior to extracting the entire archive """ if self.is_extracted is False and self.is_tarfile: filename = self.format_file_name(fname) try: return self.tarobj.extractfile(filename).read().decode('utf-8') except KeyError: self.log_debug( f"Unable to retrieve {fname}: no such file in archive" ) return '' else: try: with open(self.format_file_name(fname), 'r', encoding='utf-8') as to_read: return to_read.read() except Exception as err: self.log_debug(f"Failed to get contents of {fname}: {err}") return '' def extract(self, quiet=False): if self.is_tarfile: if not quiet: self.report_msg("Extracting...") self.extracted_path = self.extract_self() self.is_extracted = True self.tarobj = None # we can't pickle this & not further needed else: self.extracted_path = self.archive_path # if we're running as non-root (e.g. collector), then we can have a # situation where a particular path has insufficient permissions for # us to rewrite the contents and/or add it to the ending tarfile. # Unfortunately our only choice here is to change the permissions # that were preserved during report collection if os.getuid() != 0: self.log_debug('Verifying permissions of archive contents') for dirname, dirs, files in os.walk(self.extracted_path): try: for _dir in dirs: _dirname = os.path.join(dirname, _dir) _dir_perms = os.stat(_dirname).st_mode os.chmod(_dirname, _dir_perms | stat.S_IRWXU) for filename in files: fname = os.path.join(dirname, filename) # protect against symlink race conditions if not os.path.exists(fname) or os.path.islink(fname): continue if (not os.access(fname, os.R_OK) or not os.access(fname, os.W_OK)): self.log_debug( "Adding owner rw permissions to " f"{fname.split(self.archive_path)[-1]}" ) os.chmod(fname, stat.S_IRUSR | stat.S_IWUSR) except Exception as err: self.log_debug(f"Error while trying to set perms: {err}") self.log_debug(f"Extracted path is {self.extracted_path}") def rename_top_dir(self, new_name): """Rename the top-level directory to new_name, which should be an obfuscated string that scrubs the hostname from the top-level dir which would be named after the unobfuscated sos report """ _path = self.extracted_path.replace(self.archive_name, new_name) self.archive_name = new_name os.rename(self.extracted_path, _path) self.extracted_path = _path def get_compression(self): """Return the compression type used by the archive, if any. This is then used by SoSCleaner to generate a policy-derived compression command to repack the archive """ if self.is_tarfile: if self.archive_path.endswith('xz'): return 'xz' return 'gz' return None def build_tar_file(self, method): """Pack the extracted archive as a tarfile to then be re-compressed """ mode = 'w' tarpath = self.extracted_path + '-obfuscated.tar' compr_args = {} if method: mode += f":{method}" tarpath += f".{method}" if method == 'xz': compr_args = {'preset': 3} else: compr_args = {'compresslevel': 6} self.log_debug(f"Building tar file {tarpath}") with tarfile.open(tarpath, mode=mode, **compr_args) as tar: tar.add(self.extracted_path, arcname=os.path.split(self.archive_name)[1]) return tarpath def compress(self, method): """Execute the compression command, and set the appropriate final archive path for later reference by SoSCleaner on a per-archive basis """ try: self.final_archive_path = self.build_tar_file(method) except Exception as err: self.log_debug(f"Exception while re-compressing archive: {err}") raise self.log_debug(f"Compressed to {self.final_archive_path}") try: self.remove_extracted_path() except Exception as err: self.log_debug(f"Failed to remove extraction directory: {err}") self.report_msg('Failed to remove temporary extraction directory') def remove_extracted_path(self): """After the tarball has been re-compressed, remove the extracted path so that we don't take up that duplicate space any longer during execution """ try: self.log_debug(f"Removing {self.extracted_path}") shutil.rmtree(self.extracted_path) except OSError: os.chmod(self.extracted_path, stat.S_IWUSR) if os.path.isfile(self.extracted_path): os.remove(self.extracted_path) else: shutil.rmtree(self.extracted_path) def extract_self(self): """Extract an archive into our tmpdir so that we may inspect it or iterate through its contents for obfuscation """ with ProcessPoolExecutor(1) as _pool: _path_future = _pool.submit(extract_archive, self.archive_path, self.tmpdir) path = _path_future.result() return path def get_symlinks(self): """Iterator for a list of symlinks in the archive""" for dirname, dirs, files in os.walk(self.extracted_path): for _dir in dirs: _dirpath = os.path.join(dirname, _dir) if os.path.islink(_dirpath): yield _dirpath for filename in files: _fname = os.path.join(dirname, filename) if os.path.islink(_fname): yield _fname def get_files(self): """Iterator for a list of files in the archive, to allow clean to iterate over. Will not include symlinks, as those are handled separately """ for dirname, _, files in os.walk(self.extracted_path): for filename in files: _fname = os.path.join(dirname, filename.lstrip('/')) if not os.path.islink(_fname): yield _fname def get_directory_list(self): """Return a list of all directories within the archive""" dir_list = [] for dirname, _, _ in os.walk(self.extracted_path): dir_list.append(dirname) return dir_list def update_sub_count(self, count): """Called when a file has finished being parsed and used to track total substitutions made and number of files that had changes made """ self.files_obfuscated_count += 1 self.total_sub_count += count def get_file_path(self, fname): """Return the filepath of a specific file within the archive so that it may be selectively inspected if it exists """ _path = os.path.join(self.extracted_path, fname.lstrip('/')) return _path if os.path.exists(_path) else '' def should_skip_file(self, filename): """Checks the provided filename against a list of filepaths to not perform obfuscation on, as defined in self.skip_list Positional arguments: :param filename str: Filename relative to the extracted archive root """ if (not os.path.isfile(self.get_file_path(filename)) and not os.path.islink(self.get_file_path(filename))): return True for _skip in self.skip_list: if filename.startswith(_skip) or re.match(_skip, filename): return True return False def should_remove_file(self, fname): """Determine if the file should be removed or not, due to an inability to reliably obfuscate that file based on the filename. :param fname: Filename relative to the extracted archive root :type fname: ``str`` :returns: ``True`` if the file cannot be reliably obfuscated :rtype: ``bool`` """ obvious_removes = [ r'.*\.gz$', # TODO: support flat gz/xz extraction r'.*\.xz$', r'.*\.bzip2$', r'.*\.tar\..*', # TODO: support archive unpacking r'.*\.txz$', r'.*\.tgz$', r'.*\.bin$', r'.*\.journal$', r'.*\~$' ] # if the filename matches, it is obvious we can remove them without # doing the read test for _arc_reg in obvious_removes: if re.match(_arc_reg, fname): return True _full_path = self.get_file_path(fname) if os.path.isfile(_full_path): return file_is_binary(_full_path) # don't fail on dir-level symlinks return False # vim: set et ts=4 sw=4 : archives/generic.py000064400000002666151116317160010352 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import os import tarfile from sos.cleaner.archives import SoSObfuscationArchive class DataDirArchive(SoSObfuscationArchive): """A plain directory on the filesystem that is not directly associated with any known or supported collection utility """ type_name = 'data_dir' description = 'unassociated directory' @classmethod def check_is_type(cls, arc_path): return os.path.isdir(arc_path) def set_archive_root(self): return os.path.abspath(self.archive_path) class TarballArchive(SoSObfuscationArchive): """A generic tar archive that is not associated with any known or supported collection utility """ type_name = 'tarball' description = 'unassociated tarball' @classmethod def check_is_type(cls, arc_path): try: return tarfile.is_tarfile(arc_path) except Exception: return False def set_archive_root(self): if self.tarobj.firstmember.isdir(): return self.tarobj.firstmember.name return '' archives/insights.py000064400000002117151116317160010555 0ustar00# Copyright 2021 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import tarfile from sos.cleaner.archives import SoSObfuscationArchive class InsightsArchive(SoSObfuscationArchive): """This class represents archives generated by the insights-client utility for RHEL systems. """ type_name = 'insights' description = 'insights-client archive' @classmethod def check_is_type(cls, arc_path): try: return tarfile.is_tarfile(arc_path) and 'insights-' in arc_path except Exception: return False def get_archive_root(self): top = self.archive_path.split('/')[-1].split('.tar')[0] if self.tarobj.firstmember.name == '.': top = './' + top return top archives/sos.py000064400000005600151116317160007531 0ustar00# Copyright 2021 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import os import tarfile from sos.cleaner.archives import SoSObfuscationArchive class SoSReportArchive(SoSObfuscationArchive): """This is the class representing an sos report, or in other words the type the archive the SoS project natively generates """ type_name = 'report' description = 'sos report archive' @classmethod def check_is_type(cls, arc_path): try: return tarfile.is_tarfile(arc_path) and 'sosreport-' in arc_path except Exception: return False class SoSReportDirectory(SoSReportArchive): """This is the archive class representing a build directory, or in other words what `sos report --clean` will end up using for in-line obfuscation """ type_name = 'report_dir' description = 'sos report directory' @classmethod def check_is_type(cls, arc_path): if os.path.isdir(arc_path): return 'sos_logs' in os.listdir(arc_path) return False class SoSCollectorArchive(SoSObfuscationArchive): """Archive class representing the tarball created by ``sos collect``. It will not provide prep files on its own, however it will provide a list of SoSReportArchive's which will then be used to prep the parsers """ type_name = 'collect' description = 'sos collect tarball' is_nested = True @classmethod def check_is_type(cls, arc_path): try: return (tarfile.is_tarfile(arc_path) and 'sos-collect' in arc_path) except Exception: return False def get_nested_archives(self): self.extract(quiet=True) _path = self.extracted_path archives = [] for fname in os.listdir(_path): arc_name = os.path.join(_path, fname) if 'sosreport-' in fname and tarfile.is_tarfile(arc_name): archives.append(SoSReportArchive(arc_name, self.tmpdir, self.keep_binary_files)) return archives class SoSCollectorDirectory(SoSCollectorArchive): """The archive class representing the temp directory used by ``sos collect`` when ``--clean`` is used during runtime. """ type_name = 'collect_dir' description = 'sos collect directory' @classmethod def check_is_type(cls, arc_path): if os.path.isdir(arc_path): for fname in os.listdir(arc_path): if 'sos-collector-' in fname: return True return False # vim: set et ts=4 sw=4 : mappings/__pycache__/__init__.cpython-39.opt-1.pyc000064400000014332151116317160015726 0ustar00a \h@s6ddlZddlZddlZddlmZGdddZdS)N)Pathc@s|eZdZdZgZgZdZdZdZddZ ddZ dd Z d d Z d d Z ddZddZddZddZddZddZdS)SoSMapzStandardized way to store items with their obfuscated counterparts. Each type of sanitization that SoSCleaner supports should have a corresponding SoSMap() object, to allow for easy retrieval of obfuscated items. TFcCsJi|_t|_g|_|jj|_||_t j |jd|j|_ | dS)NZ cleaner_cache)datasetset _regexes_madecompiled_regexes __class____name__lowerZcnameworkdirospathjoin cache_dir load_entries)selfr rA/usr/lib/python3.9/site-packages/sos/cleaner/mappings/__init__.py__init__ s zSoSMap.__init__cCs"t|jjddd|ddS)ai Load cached entries from the disk. This method must be called when we initialize a Map instance and whenever we want to retrieve self.dataset (e.g. to store default_mapping file). The later is essential since a concurrent Map can add more objects to the cache, so we need to update self.dataset up to date. Keep in mind that size of self.dataset is usually bigger than number of files in the corresponding cleaner's directory: directory contains just whole items (e.g. IP addresses) while dataset contains more derived objects (e.g. subnets). T)parentsexist_okN)rrmkdirload_new_entries_from_dir)rrrrr,s zSoSMap.load_entriescCsX|r.||jvs.||jvs.|jr2t|dkr2dS|jD]}t||tjr8dSq8dS)zSome items need to be completely ignored, for example link-local or loopback addresses should not be obfuscated TF) skip_keysrvaluesignore_short_itemslenignore_matchesrematchI)ritemskiprrr ignore_item<s  zSoSMap.ignore_itemcCsFz|||j|<Wnty0||j|<Yn0|jrB||dS)N) sanitize_itemr Exceptioncompile_regexesadd_regex_itemrr#rrradd_sanitised_item_to_datasetHs  z$SoSMap.add_sanitised_item_to_datasetcCsd}|dkrtj|j|}tj|rd}t|ddd}|}Wdn1sZ0Y|j|ds|| ||d7}tj|j|}q|d8}|d7}qdS)Nrrutf-8encodingFr) r r rrisfileopenreadrgetr+)rcounterZ no_files_cntfnamefr#rrrrPs & z SoSMap.load_new_entries_from_dircCs||r|Sd}|j|ds|sltj|jd}t|jddd}||Wdn1sb0Yz8t |jd}t |jt j |j|||Wqty||Yq0q|j|S)zAdd a particular item to the map, generating an obfuscated pair for it. Positional arguments: :param item: The plaintext object to obfuscate NF)dirwr.r/r)r%rr4tempfileZNamedTemporaryFilerr2namewriterr linkr rr+FileExistsErrorr)rr#Ztmpfiler7r5rrraddfs" ( z SoSMap.addcCsR||rdS||jvrN|j||j|||f|jjdddddS)zAdd an item to the regexes dict and then re-sort the list that the parsers will use during parse_line() :param item: The unobfuscated item to generate a regex for :type item: ``str`` NcSs t|dS)Nr)r)xrrrz'SoSMap.add_regex_item..T)keyreverse)r%rr?rappendget_regex_resultsortr*rrrr)s    zSoSMap.add_regex_itemcCs2|jrdt|d}n t|}t|tjS)a9Generate the object/value that is used by the parser when iterating over pre-generated regexes during parse_line(). For most parsers this will simply be a ``re.Pattern()`` object, but for more complex parsers this can be overridden to provide a different object, e.g. a tuple, for that parer's specific iteration needs. :param item: The unobfuscated string to generate the regex for :type item: ``str`` :returns: A compiled regex pattern for the item :rtype: ``re.Pattern`` z (? s mappings/__pycache__/__init__.cpython-39.pyc000064400000014332151116317160014767 0ustar00a \h@s6ddlZddlZddlZddlmZGdddZdS)N)Pathc@s|eZdZdZgZgZdZdZdZddZ ddZ dd Z d d Z d d Z ddZddZddZddZddZddZdS)SoSMapzStandardized way to store items with their obfuscated counterparts. Each type of sanitization that SoSCleaner supports should have a corresponding SoSMap() object, to allow for easy retrieval of obfuscated items. TFcCsJi|_t|_g|_|jj|_||_t j |jd|j|_ | dS)NZ cleaner_cache)datasetset _regexes_madecompiled_regexes __class____name__lowerZcnameworkdirospathjoin cache_dir load_entries)selfr rA/usr/lib/python3.9/site-packages/sos/cleaner/mappings/__init__.py__init__ s zSoSMap.__init__cCs"t|jjddd|ddS)ai Load cached entries from the disk. This method must be called when we initialize a Map instance and whenever we want to retrieve self.dataset (e.g. to store default_mapping file). The later is essential since a concurrent Map can add more objects to the cache, so we need to update self.dataset up to date. Keep in mind that size of self.dataset is usually bigger than number of files in the corresponding cleaner's directory: directory contains just whole items (e.g. IP addresses) while dataset contains more derived objects (e.g. subnets). T)parentsexist_okN)rrmkdirload_new_entries_from_dir)rrrrr,s zSoSMap.load_entriescCsX|r.||jvs.||jvs.|jr2t|dkr2dS|jD]}t||tjr8dSq8dS)zSome items need to be completely ignored, for example link-local or loopback addresses should not be obfuscated TF) skip_keysrvaluesignore_short_itemslenignore_matchesrematchI)ritemskiprrr ignore_item<s  zSoSMap.ignore_itemcCsFz|||j|<Wnty0||j|<Yn0|jrB||dS)N) sanitize_itemr Exceptioncompile_regexesadd_regex_itemrr#rrradd_sanitised_item_to_datasetHs  z$SoSMap.add_sanitised_item_to_datasetcCsd}|dkrtj|j|}tj|rd}t|ddd}|}Wdn1sZ0Y|j|ds|| ||d7}tj|j|}q|d8}|d7}qdS)Nrrutf-8encodingFr) r r rrisfileopenreadrgetr+)rcounterZ no_files_cntfnamefr#rrrrPs & z SoSMap.load_new_entries_from_dircCs||r|Sd}|j|ds|sltj|jd}t|jddd}||Wdn1sb0Yz8t |jd}t |jt j |j|||Wqty||Yq0q|j|S)zAdd a particular item to the map, generating an obfuscated pair for it. Positional arguments: :param item: The plaintext object to obfuscate NF)dirwr.r/r)r%rr4tempfileZNamedTemporaryFilerr2namewriterr linkr rr+FileExistsErrorr)rr#Ztmpfiler7r5rrraddfs" ( z SoSMap.addcCsR||rdS||jvrN|j||j|||f|jjdddddS)zAdd an item to the regexes dict and then re-sort the list that the parsers will use during parse_line() :param item: The unobfuscated item to generate a regex for :type item: ``str`` NcSs t|dS)Nr)r)xrrrz'SoSMap.add_regex_item..T)keyreverse)r%rr?rappendget_regex_resultsortr*rrrr)s    zSoSMap.add_regex_itemcCs2|jrdt|d}n t|}t|tjS)a9Generate the object/value that is used by the parser when iterating over pre-generated regexes during parse_line(). For most parsers this will simply be a ``re.Pattern()`` object, but for more complex parsers this can be overridden to provide a different object, e.g. a tuple, for that parer's specific iteration needs. :param item: The unobfuscated string to generate the regex for :type item: ``str`` :returns: A compiled regex pattern for the item :rtype: ``re.Pattern`` z (? s mappings/__pycache__/hostname_map.cpython-39.opt-1.pyc000064400000014631151116317160016644 0ustar00a \h(@s(ddlZddlmZGdddeZdS)N)SoSMapcseZdZdZgdZddgZdZdZdZdZ dZ iZ iZ dd Z fd d Zd d ZddZfddZddZddZddZddZZS)SoSHostnameMapaLMapping store for hostnames and domain names Hostnames are obfuscated using an incrementing counter based on the total number of hosts matched regardless of domain name. Domain names are obfuscated based on the host's hostname, plus any user defined domains passed in by the `--domains` options. Domains are obfuscated as whole units, meaning the domains 'example.com' and 'host.foo.example.com' will be separately obfuscated with no relation for example as 'obfuscatedomdain1.com' and 'obfuscatedomain2.com'. Top-level domains are left untouched. )Z localhostz.*localdomain.*z^com..*ZwwwZapi) z.yamlz.ymlz.crtz.keyz.pemz.logz.repoz.rulesz.confz.cfgTrcCs|jD]\}}t|ddkr@|j||j|dd<q |dr`|dd|j|<q d|ddd}|s~q |jD]J\}}d|ddd}||krd|ddd}||j|<qq |dS)zBecause we use 'intermediary' dicts for host names and domain names in this parser, we need to re-inject entries from the map_file into these dicts and not just the underlying 'dataset' dict .robfuscateddomainN) datasetitemslensplithosts startswith_domainsjoinset_initial_counts)selfdomainZob_pairZ_domain_to_injectZexisting_domainvalue _existingZ _ob_domainrE/usr/lib/python3.9/site-packages/sos/cleaner/mappings/hostname_map.pyload_domains_from_map5s z$SoSHostnameMap.load_domains_from_mapcs d|vr|dd}t|S)zOverride the base get_regex_result() to provide a regex that, if this is an FQDN or a straight domain, will include an underscore formatted regex as well. rz(\.|_))replacesuperget_regex_result)ritem __class__rrrSs zSoSHostnameMap.get_regex_resultcCsz2t|jddddd}t|d|_WntyDYn0z{z?SoSHostnameMap.domain_name_in_loaded_domains..F)rr rr r any)rrrrr+rdomain_name_in_loaded_domainsps   z,SoSHostnameMap.domain_name_in_loaded_domainsc sd}d}d}|dr0||d7}|dd}q |drT||d7}|dd}q0||jvrh|j|S||sd|||gS||jrd|dd}||d}||7}||jvrt |j dt dD]}d }||}|d} | dr| d|j vrd}t |dks|ds*q|s^|ddsL||r^t |}q||drz2||dd} |dt | }WqWqtyYq0q|st |}|||S) N)r_rrrrT)rkeyF)r r)rr/lowerr strip_extsr rr keysr r rget Exception) rrprefixsuffixfinalextrZ _host_substr_testZ_hZitmrrrr6sV                zSoSHostnameMap.getc Cs|d}t|dkr(||dSt|dkr\||}tdd|DrX|}|St|dkr|d}|dd}t|dkr||}nd}||}||j|<d||g}tdd|Dr|}|SdS) Nrrrcss|]}|VqdSr(isupperr*r&rrrr,r-z/SoSHostnameMap.sanitize_item..unknowncss|]}|VqdSr(r>r@rrrr,r-) r r sanitize_short_namer3sanitize_domainallupperrr) rrrdnamehostnamerZ ob_hostname ob_domainZ_fqdnrrr sanitize_items*         zSoSHostnameMap.sanitize_itemcCs^|r||jvr|S||jvrTd|j}||j|<|jd7_||j|<|||j|S)zObfuscate the short name of the host with an incremented counter based on the total number of obfuscated host names rr) skip_keysrr#r Zadd_regex_item)rrGZob_hostrrrrBs     z"SoSHostnameMap.sanitize_short_namecCsz|jD]$}t|d|rd|Sq|d}d|dd}||}d||g}||jd|<|S)zeObfuscate the domainname, broken out into subdomains. Top-level domains are ignored. rrr)ignore_matchesrematchrr3_new_obfuscated_domainr)rrZ_skipZ top_domainrFrHrrrrCs   zSoSHostnameMap.sanitize_domaincCs4||jvr*d|j|j|<|jd7_|j|S)zDGenerate an obfuscated domain for each subdomain name given rr)rr%)rrFrrrrNs z%SoSHostnameMap._new_obfuscated_domain)__name__ __module__ __qualname____doc__rKrJr4Zignore_short_itemsZmatch_full_words_onlyr#r%rr rrrr/r6rIrBrCrN __classcell__rrrrrs*  = r)rLZsos.cleaner.mappingsrrrrrr s mappings/__pycache__/hostname_map.cpython-39.pyc000064400000014631151116317160015705 0ustar00a \h(@s(ddlZddlmZGdddeZdS)N)SoSMapcseZdZdZgdZddgZdZdZdZdZ dZ iZ iZ dd Z fd d Zd d ZddZfddZddZddZddZddZZS)SoSHostnameMapaLMapping store for hostnames and domain names Hostnames are obfuscated using an incrementing counter based on the total number of hosts matched regardless of domain name. Domain names are obfuscated based on the host's hostname, plus any user defined domains passed in by the `--domains` options. Domains are obfuscated as whole units, meaning the domains 'example.com' and 'host.foo.example.com' will be separately obfuscated with no relation for example as 'obfuscatedomdain1.com' and 'obfuscatedomain2.com'. Top-level domains are left untouched. )Z localhostz.*localdomain.*z^com..*ZwwwZapi) z.yamlz.ymlz.crtz.keyz.pemz.logz.repoz.rulesz.confz.cfgTrcCs|jD]\}}t|ddkr@|j||j|dd<q |dr`|dd|j|<q d|ddd}|s~q |jD]J\}}d|ddd}||krd|ddd}||j|<qq |dS)zBecause we use 'intermediary' dicts for host names and domain names in this parser, we need to re-inject entries from the map_file into these dicts and not just the underlying 'dataset' dict .robfuscateddomainN) datasetitemslensplithosts startswith_domainsjoinset_initial_counts)selfdomainZob_pairZ_domain_to_injectZexisting_domainvalue _existingZ _ob_domainrE/usr/lib/python3.9/site-packages/sos/cleaner/mappings/hostname_map.pyload_domains_from_map5s z$SoSHostnameMap.load_domains_from_mapcs d|vr|dd}t|S)zOverride the base get_regex_result() to provide a regex that, if this is an FQDN or a straight domain, will include an underscore formatted regex as well. rz(\.|_))replacesuperget_regex_result)ritem __class__rrrSs zSoSHostnameMap.get_regex_resultcCsz2t|jddddd}t|d|_WntyDYn0z{z?SoSHostnameMap.domain_name_in_loaded_domains..F)rr rr r any)rrrrr+rdomain_name_in_loaded_domainsps   z,SoSHostnameMap.domain_name_in_loaded_domainsc sd}d}d}|dr0||d7}|dd}q |drT||d7}|dd}q0||jvrh|j|S||sd|||gS||jrd|dd}||d}||7}||jvrt |j dt dD]}d }||}|d} | dr| d|j vrd}t |dks|ds*q|s^|ddsL||r^t |}q||drz2||dd} |dt | }WqWqtyYq0q|st |}|||S) N)r_rrrrT)rkeyF)r r)rr/lowerr strip_extsr rr keysr r rget Exception) rrprefixsuffixfinalextrZ _host_substr_testZ_hZitmrrrr6sV                zSoSHostnameMap.getc Cs|d}t|dkr(||dSt|dkr\||}tdd|DrX|}|St|dkr|d}|dd}t|dkr||}nd}||}||j|<d||g}tdd|Dr|}|SdS) Nrrrcss|]}|VqdSr(isupperr*r&rrrr,r-z/SoSHostnameMap.sanitize_item..unknowncss|]}|VqdSr(r>r@rrrr,r-) r r sanitize_short_namer3sanitize_domainallupperrr) rrrdnamehostnamerZ ob_hostname ob_domainZ_fqdnrrr sanitize_items*         zSoSHostnameMap.sanitize_itemcCs^|r||jvr|S||jvrTd|j}||j|<|jd7_||j|<|||j|S)zObfuscate the short name of the host with an incremented counter based on the total number of obfuscated host names rr) skip_keysrr#r Zadd_regex_item)rrGZob_hostrrrrBs     z"SoSHostnameMap.sanitize_short_namecCsz|jD]$}t|d|rd|Sq|d}d|dd}||}d||g}||jd|<|S)zeObfuscate the domainname, broken out into subdomains. Top-level domains are ignored. rrr)ignore_matchesrematchrr3_new_obfuscated_domainr)rrZ_skipZ top_domainrFrHrrrrCs   zSoSHostnameMap.sanitize_domaincCs4||jvr*d|j|j|<|jd7_|j|S)zDGenerate an obfuscated domain for each subdomain name given rr)rr%)rrFrrrrNs z%SoSHostnameMap._new_obfuscated_domain)__name__ __module__ __qualname____doc__rKrJr4Zignore_short_itemsZmatch_full_words_onlyr#r%rr rrrr/r6rIrBrCrN __classcell__rrrrrs*  = r)rLZsos.cleaner.mappingsrrrrrr s mappings/__pycache__/ip_map.cpython-39.opt-1.pyc000064400000012523151116317160015434 0ustar00a \h7 @s(ddlZddlmZGdddeZdS)N)SoSMapc@speZdZdZgdZiZdZgdZdZdZ ddZ d d Z d d Z d dZ ddZddZddZddZdS)SoSIPMapaA mapping store for IP addresses Each IP address added to this map is chcked for subnet membership. If that subnet already exists in the map, then IP addresses are deterministically generated sequentially within that subnet. For example, if a given IP is matched to subnet 192.168.1.0/24 then 192.168.1 may be obfuscated to 100.11.12.0/24. Each IP address in the original 192.168.1.0/24 subnet will then be assigned an address in 100.11.12.0/24 sequentially, such as 100.11.12.1, 100.11.12.2, etc... Internally, the ipaddress library is used to manipulate the address objects however, when retrieved by SoSCleaner any values will be strings. )z127.*z::1z0\.(.*)?z1\.(.*)?z8.8.8.8z8.8.4.4z 169.254.*z255.*d)Z127Z169Z172Z192Fl"XcCs>|jD].}t|jdddd|ddkr dSq dS)aThere are multiple ways in which an ip address could be handed to us in a way where we're matching against a previously obfuscated address. Here, match the ip address to any of the obfuscated addresses we've already created /)maxsplitrTF)datasetvaluesstrsplit)selfZipaddr_ipr?/usr/lib/python3.9/site-packages/sos/cleaner/mappings/ip_map.py ip_in_dataset5s$zSoSIPMap.ip_in_datasetcCsd}||r|d|}||jvr2|j|S||sF||rJ|Sd|vr|jD]$\}}||r\|ddSq\||S)zEnsure that when requesting an obfuscated address, we return a str object instead of an IPv(4|6)Address object )r=])rr) startswithlstripjoinrZ ignore_itemritemsr add)r itemZ filt_startkeyvaluerrrgetAs    z SoSIPMap.getcCs`g}|jD]0}|j|jkr&||_dS|j|vr ||q |r\|jdddd|d|_dS)a/Determine if a given address is in a subnet of an already obfuscated network and if it is, then set the address' network to the network object we're tracking. This allows us to match ip addresses with or without a CIDR notation and maintain proper network relationships. NcSs|jS)N) prefixlen)nrrrlz;SoSIPMap.set_ip_cidr_from_existing_subnet..T)rreverser) _networksipbroadcast_addressnetworkappendsort)r addrnetsnetrrr set_ip_cidr_from_existing_subnetZs    z)SoSIPMap.set_ip_cidr_from_existing_subnetcCsdzt|}Wn ty.|j|Yn0|j}t|jdkrP||n | || |S)ziGiven an IP address, sanitize it to an obfuscated network or host address as appropriate z255.255.255.255) ipaddress ip_interface ValueErrorignore_matchesr'r&r netmaskr,sanitize_networksanitize_ipaddr)r rr)r&rrr sanitize_itemos    zSoSIPMap.sanitize_itemcCs||jvr||dS)zObfuscate the network address provided, and if there are host bits in the address then obfuscate those as well N)r#_new_obfuscated_network)r r&rrrr2s zSoSIPMap.sanitize_networkcCsh|j|jvr`|j|j}|j|jjkr0t|jS|D]&}||s8t|d|jSq8|S)zEObfuscate the IP address within the known obfuscated network r) r&r#r$r%r hostsrr_new_obfuscated_single_address)r r) _obf_networkr rrrr3s     zSoSIPMap.sanitize_ipaddrcCs|jd7_|jddvr,|jd7_q|jd?d|jd?dd|jd?dd|jd}||jvr||S|S)Nr)r.) _saddr_cntrr r7)r Z_addrrrrr7s z'SoSIPMap._new_obfuscated_single_addresscCsd}t|tjrh|j|jvr*|jd7_|jd}|jdd}t|d|}|jd7_t|tjrt|r||j|<t ||j t |<dS)a Generate an obfuscated network address for the network address given which will allow us to maintain network relationships without divulging actual network details Positional arguments: :param network: An ipaddress.IPv{4|6)Network object Nrz.0.0.0r) isinstancer- IPv4Networknetwork_first_octetskip_network_octets with_netmaskr IPv6Networkr#r r)r r&r8Z _obf_addressZ _obf_maskrrrr5s      z SoSIPMap._new_obfuscated_networkN)__name__ __module__ __qualname____doc__r0r#rBrCZcompile_regexesr?rrr,r4r2r3r7r5rrrrrs   r)r-Zsos.cleaner.mappingsrrrrrr s mappings/__pycache__/ip_map.cpython-39.pyc000064400000012523151116317160014475 0ustar00a \h7 @s(ddlZddlmZGdddeZdS)N)SoSMapc@speZdZdZgdZiZdZgdZdZdZ ddZ d d Z d d Z d dZ ddZddZddZddZdS)SoSIPMapaA mapping store for IP addresses Each IP address added to this map is chcked for subnet membership. If that subnet already exists in the map, then IP addresses are deterministically generated sequentially within that subnet. For example, if a given IP is matched to subnet 192.168.1.0/24 then 192.168.1 may be obfuscated to 100.11.12.0/24. Each IP address in the original 192.168.1.0/24 subnet will then be assigned an address in 100.11.12.0/24 sequentially, such as 100.11.12.1, 100.11.12.2, etc... Internally, the ipaddress library is used to manipulate the address objects however, when retrieved by SoSCleaner any values will be strings. )z127.*z::1z0\.(.*)?z1\.(.*)?z8.8.8.8z8.8.4.4z 169.254.*z255.*d)Z127Z169Z172Z192Fl"XcCs>|jD].}t|jdddd|ddkr dSq dS)aThere are multiple ways in which an ip address could be handed to us in a way where we're matching against a previously obfuscated address. Here, match the ip address to any of the obfuscated addresses we've already created /)maxsplitrTF)datasetvaluesstrsplit)selfZipaddr_ipr?/usr/lib/python3.9/site-packages/sos/cleaner/mappings/ip_map.py ip_in_dataset5s$zSoSIPMap.ip_in_datasetcCsd}||r|d|}||jvr2|j|S||sF||rJ|Sd|vr|jD]$\}}||r\|ddSq\||S)zEnsure that when requesting an obfuscated address, we return a str object instead of an IPv(4|6)Address object )r=])rr) startswithlstripjoinrZ ignore_itemritemsr add)r itemZ filt_startkeyvaluerrrgetAs    z SoSIPMap.getcCs`g}|jD]0}|j|jkr&||_dS|j|vr ||q |r\|jdddd|d|_dS)a/Determine if a given address is in a subnet of an already obfuscated network and if it is, then set the address' network to the network object we're tracking. This allows us to match ip addresses with or without a CIDR notation and maintain proper network relationships. NcSs|jS)N) prefixlen)nrrrlz;SoSIPMap.set_ip_cidr_from_existing_subnet..T)rreverser) _networksipbroadcast_addressnetworkappendsort)r addrnetsnetrrr set_ip_cidr_from_existing_subnetZs    z)SoSIPMap.set_ip_cidr_from_existing_subnetcCsdzt|}Wn ty.|j|Yn0|j}t|jdkrP||n | || |S)ziGiven an IP address, sanitize it to an obfuscated network or host address as appropriate z255.255.255.255) ipaddress ip_interface ValueErrorignore_matchesr'r&r netmaskr,sanitize_networksanitize_ipaddr)r rr)r&rrr sanitize_itemos    zSoSIPMap.sanitize_itemcCs||jvr||dS)zObfuscate the network address provided, and if there are host bits in the address then obfuscate those as well N)r#_new_obfuscated_network)r r&rrrr2s zSoSIPMap.sanitize_networkcCsh|j|jvr`|j|j}|j|jjkr0t|jS|D]&}||s8t|d|jSq8|S)zEObfuscate the IP address within the known obfuscated network r) r&r#r$r%r hostsrr_new_obfuscated_single_address)r r) _obf_networkr rrrr3s     zSoSIPMap.sanitize_ipaddrcCs|jd7_|jddvr,|jd7_q|jd?d|jd?dd|jd?dd|jd}||jvr||S|S)Nr)r.) _saddr_cntrr r7)r Z_addrrrrr7s z'SoSIPMap._new_obfuscated_single_addresscCsd}t|tjrh|j|jvr*|jd7_|jd}|jdd}t|d|}|jd7_t|tjrt|r||j|<t ||j t |<dS)a Generate an obfuscated network address for the network address given which will allow us to maintain network relationships without divulging actual network details Positional arguments: :param network: An ipaddress.IPv{4|6)Network object Nrz.0.0.0r) isinstancer- IPv4Networknetwork_first_octetskip_network_octets with_netmaskr IPv6Networkr#r r)r r&r8Z _obf_addressZ _obf_maskrrrr5s      z SoSIPMap._new_obfuscated_networkN)__name__ __module__ __qualname____doc__r0r#rBrCZcompile_regexesr?rrr,r4r2r3r7r5rrrrrs   r)r-Zsos.cleaner.mappingsrrrrrr s mappings/__pycache__/ipv6_map.cpython-39.opt-1.pyc000064400000024137151116317160015714 0ustar00a \h./@s6ddlZddlmZGdddeZGdddZdS)N)SoSMapc@sDeZdZdZiZgdZdgZdZdZddZ dd Z dd d Z d S) SoSIPv6MapatMapping for IPv6 addresses and networks. Much like the IP map handles IPv4 addresses, this map is designed to take IPv6 strings and obfuscate them consistently to maintain network topology. To do this, addresses will be manipulated by the ipaddress library. If an IPv6 address is encountered without a netmask, it is assumed to be a /64 address. )z^::1/.*z::/0zfd53:.*z^53..:534fFcCsd|vr dS|dD]x}t|}|d|d}|||}|j|j|j<|d|dD].}|d|d|}|||||j|<q\qdS)zOverride the base conf_update() so that we can load the existing networks into ObfuscatedIPv6Network() objects for the current run. networksN obfuscatedhosts) ipaddress ip_network _get_networkobfuscated_addressdatasetoriginal_addressadd_obfuscated_host_address)selfZconfignetworkZ_origZ _obfuscated_nethost_ob_hostrA/usr/lib/python3.9/site-packages/sos/cleaner/mappings/ipv6_map.py conf_update)s    zSoSIPv6Map.conf_updatecCsd|vr|ddnd}|}|s*|d7}zt|}||}|j}Wnbtytj|dd}||}|j|jvr|j|j|j<t |dd}| |}Yn0|rd|vr|d|S|S)N/z/64F)strictr) splitr r r r ValueError network_addrr r ip_addressobfuscate_host_address)ritem_prefixZ_ipaddr_addrrZ _hostaddrrrr sanitize_item9s$       zSoSIPv6Map.sanitize_itemrcCs.|j}||jvr$t|||j|j|<|j|S)zAttempt to find an existing ObfuscatedIPv6Network object from which to either find an existing obfuscated match, or create a new one. If no such object already exists, create it. ) compressedrObfuscatedIPv6Network first_hexes)raddressrr#rrrr Vs   zSoSIPv6Map._get_networkN)r) __name__ __module__ __qualname____doc__rZignore_matchesr'Zcompile_regexesversionrr$r rrrrrs rc@sneZdZdZiZdddZeddZedd Zd d Z d d Z ddZ ddZ ddZ ddZddZdS)r&aAn abstraction class that represents a network that is (to be) handled by sos. Each distinct IPv6 network that we encounter will have a representative instance of this class, from which new obfuscated subnets and host addresses will be generated. This class should be built from an ``ipaddress.IPv6Network`` object. If an obfuscation string is not passed, one will be created during init. rNcCst|tjstd||_|j|_|jj|_ i|_ |durDdg|_ n||_ |sZ| |_ n,t|tsvtdt||dd|_ dS)aBasic setup for the obfuscated network. Minor validation on the addr used to create the instance, as well as on an optional ``obfuscation`` which if set, will serve as the obfuscated_network address. :param addr: The *un*obfuscated network to be handled :type addr: ``ipaddress.IPv6Network`` :param obfuscation: An optional pre-determined string representation of the obfuscated network address :type obfuscation: ``str`` :param used_hexes: A list of already used hexes for the first hextet of a potential global address obfuscation :type used_hexes: ``list`` z*Invalid network: not an IPv6Network objectNrz;Pre-determined obfuscated network address must be str, not rr) isinstancer IPv6Network Exceptionaddr prefixlenprefixnetwork_addressr%rrr'_obfuscate_network_address_obfuscated_networkstr TypeErrortyper)rr1Z obfuscationZ used_hexesrrr__init__qs      zObfuscatedIPv6Network.__init__cCs|jd|jS)Nr)r6r3rrrrr sz(ObfuscatedIPv6Network.obfuscated_addresscCs|jjSN)r1r%r;rrrrsz&ObfuscatedIPv6Network.original_addresscsfdd|DS)a#Generate a set of obfuscated hextets, based on the length of the source hextet. If any hextets are compressed, keep that compression. E.G. '::1234:bcd' will generate a leading empty '' hextet, followed by two 4-character hextets, e.g. '::0005:0006'. :param hextets: The extracted hextets from a source address :type hextets: ``list`` :returns: A set of generated hextets for use in an obfuscated address :rtype: ``list`` csg|]}|rdndqS)r) obfuscate_hex).0hr;rr z:ObfuscatedIPv6Network.generate_hextets..r)rhextetsrr;rgenerate_hextetssz&ObfuscatedIPv6Network.generate_hextetscCs,|j|dd}||j|<|d|dS)aGenerate a string of size length of hex characters. Due to the need of deterministic generation in concurrent cleaner, generation starts from zero values and is incremented by one (for a given length). :param length: The number of characters to generate :type length: ``int`` :returns: A string of ``length`` hex characters :rtype: ``str`` rr0x) ob_countersget)rlengthvalrrrr>s  z#ObfuscatedIPv6Network.obfuscate_hexcCs4|jjr|S|jjr|jS|jjr.|S|jS)zGenerate the obfuscated pair for the network address. This is determined based on the netmask of the network this class was built on top of. )r1 is_global_obfuscate_global_address is_link_localr is_private_obfuscate_private_addressr;rrrr5sz0ObfuscatedIPv6Network._obfuscate_network_addresscCs|jddd}dg}tdd|Drdt|jd}||d }||jvrn||d }qP|j||g}||}||d |S) aGlobal unicast addresses have a 48-bit global routing prefix and a 16-bit subnet. We set the global routing prefix to a static sos-specific identifier that could never be seen in the wild, '534f:' We then obfuscate the subnet hextet. :rNrcss|] }| VqdSr<r)r?crrr rBzBObfuscatedIPv6Network._obfuscate_global_address..5) rralllenr'r>appendrDextendjoin)r_hextets_ob_hexZ_startextrrrrLs    z/ObfuscatedIPv6Network._obfuscate_global_addresscCs4|jddd}dg}|||d|S)a The first 8 bits will always be 'fd', the next 40 bits are meant to be a global ID, followed by 16 bits for the subnet. To keep things relatively simply we maintain the first hextet as 'fd53', and then obfuscate any remaining hextets. rPrNZfd53)rrrYrDrZ)rr[r\rrrrOsz0ObfuscatedIPv6Network._obfuscate_private_addresscsvfdd}|jjvrjjd}|jt|dd}||}|jvr\||}qD|j|j|jS)aGiven an unobfuscated address, generate an obfuscated match for it, and save it to this network for tracking during the execution of clean. Note: another way to do this would be to convert the obfuscated network to bytes, and add a random amount to that based on the number of addresses that the network can support and from that new bytes count craft a new IPv6 address. This has the advantage of absolutely guaranteeing the new address is within the network space (whereas the method employed below could *theoretically* generate an overlapping address), but would in turn remove any ability to compress obfuscated addresses to match the general format/syntax of the address it is replacing. For the moment, it is assumed that being able to maintain a quick mental note of "unobfuscated device ff00::1 is obfuscated device 53ad::a1b2" is more desireable than "ff00::1 is now obfuscated as 53ad::1234:abcd:9876:a1b2:". :param addr: The unobfuscated IPv6 address :type addr: ``ipaddress.IPv6Address`` :returns: An obfuscated address within this network :rtype: ``str`` c s"djd|dgS)NrrP)rZr6rDr)rr;rr_generate_addressszGObfuscatedIPv6Network.obfuscate_host_address.._generate_addressrPN)r%rrrstriprWlstripvaluesr)rr1r^Z_nZ_hostrrr;rr s    z,ObfuscatedIPv6Network.obfuscate_host_addresscCs||j|<dS)zjAdds an obfuscated pair to the class for tracking and ongoing consistency in obfuscation. N)r)rrrrrrrsz1ObfuscatedIPv6Network.add_obfuscated_host_address)rN)r)r*r+r,rGr:propertyr rrDr>r5rLrOr rrrrrr&bs  "   'r&)r Zsos.cleaner.mappingsrrr&rrrr s Rmappings/__pycache__/ipv6_map.cpython-39.pyc000064400000024137151116317160014755 0ustar00a \h./@s6ddlZddlmZGdddeZGdddZdS)N)SoSMapc@sDeZdZdZiZgdZdgZdZdZddZ dd Z dd d Z d S) SoSIPv6MapatMapping for IPv6 addresses and networks. Much like the IP map handles IPv4 addresses, this map is designed to take IPv6 strings and obfuscate them consistently to maintain network topology. To do this, addresses will be manipulated by the ipaddress library. If an IPv6 address is encountered without a netmask, it is assumed to be a /64 address. )z^::1/.*z::/0zfd53:.*z^53..:534fFcCsd|vr dS|dD]x}t|}|d|d}|||}|j|j|j<|d|dD].}|d|d|}|||||j|<q\qdS)zOverride the base conf_update() so that we can load the existing networks into ObfuscatedIPv6Network() objects for the current run. networksN obfuscatedhosts) ipaddress ip_network _get_networkobfuscated_addressdatasetoriginal_addressadd_obfuscated_host_address)selfZconfignetworkZ_origZ _obfuscated_nethost_ob_hostrA/usr/lib/python3.9/site-packages/sos/cleaner/mappings/ipv6_map.py conf_update)s    zSoSIPv6Map.conf_updatecCsd|vr|ddnd}|}|s*|d7}zt|}||}|j}Wnbtytj|dd}||}|j|jvr|j|j|j<t |dd}| |}Yn0|rd|vr|d|S|S)N/z/64F)strictr) splitr r r r ValueError network_addrr r ip_addressobfuscate_host_address)ritem_prefixZ_ipaddr_addrrZ _hostaddrrrr sanitize_item9s$       zSoSIPv6Map.sanitize_itemrcCs.|j}||jvr$t|||j|j|<|j|S)zAttempt to find an existing ObfuscatedIPv6Network object from which to either find an existing obfuscated match, or create a new one. If no such object already exists, create it. ) compressedrObfuscatedIPv6Network first_hexes)raddressrr#rrrr Vs   zSoSIPv6Map._get_networkN)r) __name__ __module__ __qualname____doc__rZignore_matchesr'Zcompile_regexesversionrr$r rrrrrs rc@sneZdZdZiZdddZeddZedd Zd d Z d d Z ddZ ddZ ddZ ddZddZdS)r&aAn abstraction class that represents a network that is (to be) handled by sos. Each distinct IPv6 network that we encounter will have a representative instance of this class, from which new obfuscated subnets and host addresses will be generated. This class should be built from an ``ipaddress.IPv6Network`` object. If an obfuscation string is not passed, one will be created during init. rNcCst|tjstd||_|j|_|jj|_ i|_ |durDdg|_ n||_ |sZ| |_ n,t|tsvtdt||dd|_ dS)aBasic setup for the obfuscated network. Minor validation on the addr used to create the instance, as well as on an optional ``obfuscation`` which if set, will serve as the obfuscated_network address. :param addr: The *un*obfuscated network to be handled :type addr: ``ipaddress.IPv6Network`` :param obfuscation: An optional pre-determined string representation of the obfuscated network address :type obfuscation: ``str`` :param used_hexes: A list of already used hexes for the first hextet of a potential global address obfuscation :type used_hexes: ``list`` z*Invalid network: not an IPv6Network objectNrz;Pre-determined obfuscated network address must be str, not rr) isinstancer IPv6Network Exceptionaddr prefixlenprefixnetwork_addressr%rrr'_obfuscate_network_address_obfuscated_networkstr TypeErrortyper)rr1Z obfuscationZ used_hexesrrr__init__qs      zObfuscatedIPv6Network.__init__cCs|jd|jS)Nr)r6r3rrrrr sz(ObfuscatedIPv6Network.obfuscated_addresscCs|jjSN)r1r%r;rrrrsz&ObfuscatedIPv6Network.original_addresscsfdd|DS)a#Generate a set of obfuscated hextets, based on the length of the source hextet. If any hextets are compressed, keep that compression. E.G. '::1234:bcd' will generate a leading empty '' hextet, followed by two 4-character hextets, e.g. '::0005:0006'. :param hextets: The extracted hextets from a source address :type hextets: ``list`` :returns: A set of generated hextets for use in an obfuscated address :rtype: ``list`` csg|]}|rdndqS)r) obfuscate_hex).0hr;rr z:ObfuscatedIPv6Network.generate_hextets..r)rhextetsrr;rgenerate_hextetssz&ObfuscatedIPv6Network.generate_hextetscCs,|j|dd}||j|<|d|dS)aGenerate a string of size length of hex characters. Due to the need of deterministic generation in concurrent cleaner, generation starts from zero values and is incremented by one (for a given length). :param length: The number of characters to generate :type length: ``int`` :returns: A string of ``length`` hex characters :rtype: ``str`` rr0x) ob_countersget)rlengthvalrrrr>s  z#ObfuscatedIPv6Network.obfuscate_hexcCs4|jjr|S|jjr|jS|jjr.|S|jS)zGenerate the obfuscated pair for the network address. This is determined based on the netmask of the network this class was built on top of. )r1 is_global_obfuscate_global_address is_link_localr is_private_obfuscate_private_addressr;rrrr5sz0ObfuscatedIPv6Network._obfuscate_network_addresscCs|jddd}dg}tdd|Drdt|jd}||d }||jvrn||d }qP|j||g}||}||d |S) aGlobal unicast addresses have a 48-bit global routing prefix and a 16-bit subnet. We set the global routing prefix to a static sos-specific identifier that could never be seen in the wild, '534f:' We then obfuscate the subnet hextet. :rNrcss|] }| VqdSr<r)r?crrr rBzBObfuscatedIPv6Network._obfuscate_global_address..5) rralllenr'r>appendrDextendjoin)r_hextets_ob_hexZ_startextrrrrLs    z/ObfuscatedIPv6Network._obfuscate_global_addresscCs4|jddd}dg}|||d|S)a The first 8 bits will always be 'fd', the next 40 bits are meant to be a global ID, followed by 16 bits for the subnet. To keep things relatively simply we maintain the first hextet as 'fd53', and then obfuscate any remaining hextets. rPrNZfd53)rrrYrDrZ)rr[r\rrrrOsz0ObfuscatedIPv6Network._obfuscate_private_addresscsvfdd}|jjvrjjd}|jt|dd}||}|jvr\||}qD|j|j|jS)aGiven an unobfuscated address, generate an obfuscated match for it, and save it to this network for tracking during the execution of clean. Note: another way to do this would be to convert the obfuscated network to bytes, and add a random amount to that based on the number of addresses that the network can support and from that new bytes count craft a new IPv6 address. This has the advantage of absolutely guaranteeing the new address is within the network space (whereas the method employed below could *theoretically* generate an overlapping address), but would in turn remove any ability to compress obfuscated addresses to match the general format/syntax of the address it is replacing. For the moment, it is assumed that being able to maintain a quick mental note of "unobfuscated device ff00::1 is obfuscated device 53ad::a1b2" is more desireable than "ff00::1 is now obfuscated as 53ad::1234:abcd:9876:a1b2:". :param addr: The unobfuscated IPv6 address :type addr: ``ipaddress.IPv6Address`` :returns: An obfuscated address within this network :rtype: ``str`` c s"djd|dgS)NrrP)rZr6rDr)rr;rr_generate_addressszGObfuscatedIPv6Network.obfuscate_host_address.._generate_addressrPN)r%rrrstriprWlstripvaluesr)rr1r^Z_nZ_hostrrr;rr s    z,ObfuscatedIPv6Network.obfuscate_host_addresscCs||j|<dS)zjAdds an obfuscated pair to the class for tracking and ongoing consistency in obfuscation. N)r)rrrrrrrsz1ObfuscatedIPv6Network.add_obfuscated_host_address)rN)r)r*r+r,rGr:propertyr rrDr>r5rLrOr rrrrrr&bs  "   'r&)r Zsos.cleaner.mappingsrrr&rrrr s Rmappings/__pycache__/keyword_map.cpython-39.opt-1.pyc000064400000001770151116317160016512 0ustar00a \h|@s ddlmZGdddeZdS))SoSMapc@s eZdZdZdZdZddZdS) SoSKeywordMapa'Mapping store for user provided keywords By default, this map will perform no matching or obfuscation. It relies entirely on the use of the --keywords option by the user. Any keywords provided are then obfuscated into 'obfuscatedwordX', where X is an incrementing integer. TrcCsJ||jvr|j|Sd|j}|jd7_||jvrF||S|S)NZobfuscatedword)Zdataset word_countvalues sanitize_item)selfitemZ_ob_itemr D/usr/lib/python3.9/site-packages/sos/cleaner/mappings/keyword_map.pyrs    zSoSKeywordMap.sanitize_itemN)__name__ __module__ __qualname____doc__Zmatch_full_words_onlyrrr r r r rs rN)Zsos.cleaner.mappingsrrr r r r  s mappings/__pycache__/keyword_map.cpython-39.pyc000064400000001770151116317160015553 0ustar00a \h|@s ddlmZGdddeZdS))SoSMapc@s eZdZdZdZdZddZdS) SoSKeywordMapa'Mapping store for user provided keywords By default, this map will perform no matching or obfuscation. It relies entirely on the use of the --keywords option by the user. Any keywords provided are then obfuscated into 'obfuscatedwordX', where X is an incrementing integer. TrcCsJ||jvr|j|Sd|j}|jd7_||jvrF||S|S)NZobfuscatedword)Zdataset word_countvalues sanitize_item)selfitemZ_ob_itemr D/usr/lib/python3.9/site-packages/sos/cleaner/mappings/keyword_map.pyrs    zSoSKeywordMap.sanitize_itemN)__name__ __module__ __qualname____doc__Zmatch_full_words_onlyrrr r r r rs rN)Zsos.cleaner.mappingsrrr r r r  s mappings/__pycache__/mac_map.cpython-39.opt-1.pyc000064400000005346151116317160015571 0ustar00a \h @s(ddlZddlmZGdddeZdS)N)SoSMapcsPeZdZdZddgZdZdZdZdZdZ fd d Z fd d Z d dZ Z S) SoSMacMapaMapping store for MAC addresses MAC addresses added to this map will be broken into two halves, vendor and device like how MAC addresses are normally crafted. For the vendor hextets, obfuscation will take the form of 53:4f:53, or 'SOS' in hex. The following device hextets will be obfuscated by a series of suffixes starting from zeroes. For example a MAC address of '60:55:cb:4b:c9:27' may be obfuscated into '53:4f:53:00:00:1a' or similar. This map supports both 48-bit and 64-bit MAC addresses. 48-bit address may take the form of either: MM:MM:MM:SS:SS:SS MM-MM-MM-SS-SS-SS For 64-bit addresses, the identifier injected by IPv6 standards is used in obfuscated returns. These addresses may take either of these forms: MM:MM:MM:FF:FE:SS:SS:SS MMMM:MMFF:FESS:SSSS All mapped mac addresses are converted to lower case. Dash delimited styles will be converted to colon-delimited style. zff:ff:ff:ff:ff:ffz00:00:00:00:00:00z53:4f:53:%s:%s:%sz53:4f:53:ff:fe:%s:%s:%sz534f:53ff:fe%s:%s%sFrcs&|ddd}t|SN-:z=.,)replacelowerstripsuperaddselfitem __class__@/usr/lib/python3.9/site-packages/sos/cleaner/mappings/mac_map.pyr 6sz SoSMacMap.addcs&|ddd}t|Sr)rrr r getr rrrr:sz SoSMacMap.getcsd|jd7_|jd?|jd?d|jdg}tfdd|D}td|r^|j|Std |rt|j|Std |r|j|Sd S) zVObfuscate the device hextets, and append those to our 'vendor' hextet Z0123456789abdcefc3s*|]"}|d|dVqdS)rNr).0iZ hexdigitsrr Jsz*SoSMacMap.sanitize_item..z$(([0-9a-fA-F]{2}:){7}[0-9a-fA-F]{2})z&(([0-9a-fA-F]{4}:){3}([0-9a-fA-F]){4})z([0-9a-fA-F][:_]?){12}N)ob_hextets_cnttuplerematch mac6_templatemac6_quad_template mac_template)r rhextetsrrr sanitize_item>s         zSoSMacMap.sanitize_item)__name__ __module__ __qualname____doc__Zignore_matchesr"r r!Zcompile_regexesrr rr$ __classcell__rrrrrs  r)rZsos.cleaner.mappingsrrrrrr s mappings/__pycache__/mac_map.cpython-39.pyc000064400000005346151116317160014632 0ustar00a \h @s(ddlZddlmZGdddeZdS)N)SoSMapcsPeZdZdZddgZdZdZdZdZdZ fd d Z fd d Z d dZ Z S) SoSMacMapaMapping store for MAC addresses MAC addresses added to this map will be broken into two halves, vendor and device like how MAC addresses are normally crafted. For the vendor hextets, obfuscation will take the form of 53:4f:53, or 'SOS' in hex. The following device hextets will be obfuscated by a series of suffixes starting from zeroes. For example a MAC address of '60:55:cb:4b:c9:27' may be obfuscated into '53:4f:53:00:00:1a' or similar. This map supports both 48-bit and 64-bit MAC addresses. 48-bit address may take the form of either: MM:MM:MM:SS:SS:SS MM-MM-MM-SS-SS-SS For 64-bit addresses, the identifier injected by IPv6 standards is used in obfuscated returns. These addresses may take either of these forms: MM:MM:MM:FF:FE:SS:SS:SS MMMM:MMFF:FESS:SSSS All mapped mac addresses are converted to lower case. Dash delimited styles will be converted to colon-delimited style. zff:ff:ff:ff:ff:ffz00:00:00:00:00:00z53:4f:53:%s:%s:%sz53:4f:53:ff:fe:%s:%s:%sz534f:53ff:fe%s:%s%sFrcs&|ddd}t|SN-:z=.,)replacelowerstripsuperaddselfitem __class__@/usr/lib/python3.9/site-packages/sos/cleaner/mappings/mac_map.pyr 6sz SoSMacMap.addcs&|ddd}t|Sr)rrr r getr rrrr:sz SoSMacMap.getcsd|jd7_|jd?|jd?d|jdg}tfdd|D}td|r^|j|Std |rt|j|Std |r|j|Sd S) zVObfuscate the device hextets, and append those to our 'vendor' hextet Z0123456789abdcefc3s*|]"}|d|dVqdS)rNr).0iZ hexdigitsrr Jsz*SoSMacMap.sanitize_item..z$(([0-9a-fA-F]{2}:){7}[0-9a-fA-F]{2})z&(([0-9a-fA-F]{4}:){3}([0-9a-fA-F]){4})z([0-9a-fA-F][:_]?){12}N)ob_hextets_cnttuplerematch mac6_templatemac6_quad_template mac_template)r rhextetsrrr sanitize_item>s         zSoSMacMap.sanitize_item)__name__ __module__ __qualname____doc__Zignore_matchesr"r r!Zcompile_regexesrr rr$ __classcell__rrrrrs  r)rZsos.cleaner.mappingsrrrrrr s mappings/__pycache__/username_map.cpython-39.opt-1.pyc000064400000002055151116317160016642 0ustar00a \h@s ddlmZGdddeZdS))SoSMapc@s$eZdZdZdZdZdZddZdS)SoSUsernameMapa Mapping to store usernames matched from ``lastlog`` output. Usernames are obfuscated as ``obfuscateduserX`` where ``X`` is a counter that gets incremented for every new username found. Note that this specifically obfuscates user_names_ and not UIDs. TrcCs:d|j}|jd7_||jvr6||S|S)z@Obfuscate a new username not currently found in the map Zobfuscateduser) name_countZdatasetvalues sanitize_itemlower)selfitemZob_namer E/usr/lib/python3.9/site-packages/sos/cleaner/mappings/username_map.pyrs  zSoSUsernameMap.sanitize_itemN)__name__ __module__ __qualname____doc__Zignore_short_itemsZmatch_full_words_onlyrrr r r r rs rN)Zsos.cleaner.mappingsrrr r r r  s mappings/__pycache__/username_map.cpython-39.pyc000064400000002055151116317160015703 0ustar00a \h@s ddlmZGdddeZdS))SoSMapc@s$eZdZdZdZdZdZddZdS)SoSUsernameMapa Mapping to store usernames matched from ``lastlog`` output. Usernames are obfuscated as ``obfuscateduserX`` where ``X`` is a counter that gets incremented for every new username found. Note that this specifically obfuscates user_names_ and not UIDs. TrcCs:d|j}|jd7_||jvr6||S|S)z@Obfuscate a new username not currently found in the map Zobfuscateduser) name_countZdatasetvalues sanitize_itemlower)selfitemZob_namer E/usr/lib/python3.9/site-packages/sos/cleaner/mappings/username_map.pyrs  zSoSUsernameMap.sanitize_itemN)__name__ __module__ __qualname____doc__Zignore_short_itemsZmatch_full_words_onlyrrr r r r rs rN)Zsos.cleaner.mappingsrrr r r r  s mappings/__init__.py000064400000017736151116317160010513 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import re import os import tempfile from pathlib import Path class SoSMap(): """Standardized way to store items with their obfuscated counterparts. Each type of sanitization that SoSCleaner supports should have a corresponding SoSMap() object, to allow for easy retrieval of obfuscated items. """ # used for regex skips in parser.parse_line() ignore_matches = [] # used for filename obfuscations in parser.parse_string_for_keys() skip_keys = [] compile_regexes = True ignore_short_items = False match_full_words_only = False def __init__(self, workdir): self.dataset = {} self._regexes_made = set() self.compiled_regexes = [] self.cname = self.__class__.__name__.lower() # workdir's default value '/tmp' is used just by avocado tests, # otherwise we override it to /etc/sos/cleaner (or map_file dir) self.workdir = workdir self.cache_dir = os.path.join(self.workdir, 'cleaner_cache', self.cname) self.load_entries() def load_entries(self): """ Load cached entries from the disk. This method must be called when we initialize a Map instance and whenever we want to retrieve self.dataset (e.g. to store default_mapping file). The later is essential since a concurrent Map can add more objects to the cache, so we need to update self.dataset up to date. Keep in mind that size of self.dataset is usually bigger than number of files in the corresponding cleaner's directory: directory contains just whole items (e.g. IP addresses) while dataset contains more derived objects (e.g. subnets). """ Path(self.cache_dir).mkdir(parents=True, exist_ok=True) self.load_new_entries_from_dir(1) def ignore_item(self, item): """Some items need to be completely ignored, for example link-local or loopback addresses should not be obfuscated """ if not item or item in self.skip_keys or item in self.dataset.values()\ or (self.ignore_short_items and len(item) <= 3): return True for skip in self.ignore_matches: if re.match(skip, item, re.I): return True return False def add_sanitised_item_to_dataset(self, item): try: self.dataset[item] = self.sanitize_item(item) except Exception: self.dataset[item] = item if self.compile_regexes: self.add_regex_item(item) def load_new_entries_from_dir(self, counter): # this is a performance hack; there can be gaps in counter values as # e.g. sanitised item #14 is an IP address (in file) while item #15 # is its network (in dataset but not in files). So the next file # number is 16. The diffs should be at most 2, the above is so far # the only type of "underneath dataset growth". But let be # conservative and test next 5 numbers "only". no_files_cnt = 5 while no_files_cnt > 0: fname = os.path.join(self.cache_dir, f"{counter}") while os.path.isfile(fname): no_files_cnt = 5 with open(fname, 'r', encoding='utf-8') as f: item = f.read() if not self.dataset.get(item, False): self.add_sanitised_item_to_dataset(item) counter += 1 fname = os.path.join(self.cache_dir, f"{counter}") # no next file, but try a new next ones until no_files_cnt==0 no_files_cnt -= 1 counter += 1 def add(self, item): """Add a particular item to the map, generating an obfuscated pair for it. Positional arguments: :param item: The plaintext object to obfuscate """ if self.ignore_item(item): return item tmpfile = None while not self.dataset.get(item, False): if not tmpfile: # pylint: disable=consider-using-with tmpfile = tempfile.NamedTemporaryFile(dir=self.cache_dir) with open(tmpfile.name, 'w', encoding='utf-8') as f: f.write(item) try: counter = len(self.dataset) + 1 os.link(tmpfile.name, os.path.join(self.cache_dir, f"{counter}")) self.add_sanitised_item_to_dataset(item) except FileExistsError: self.load_new_entries_from_dir(counter) return self.dataset[item] def add_regex_item(self, item): """Add an item to the regexes dict and then re-sort the list that the parsers will use during parse_line() :param item: The unobfuscated item to generate a regex for :type item: ``str`` """ if self.ignore_item(item): return if item not in self._regexes_made: # save the item in a set to avoid clobbering existing regexes, # as searching this set is significantly faster than searching # through the actual compiled_regexes list, especially for very # large collections of entries self._regexes_made.add(item) # add the item, Pattern tuple directly to the compiled_regexes list # and then sort the existing list, rather than rebuild the list # from scratch every time we add something like we would do if we # tracked/saved the item and the Pattern() object in a dict or in # the set above self.compiled_regexes.append((item, self.get_regex_result(item))) self.compiled_regexes.sort(key=lambda x: len(x[0]), reverse=True) def get_regex_result(self, item): """Generate the object/value that is used by the parser when iterating over pre-generated regexes during parse_line(). For most parsers this will simply be a ``re.Pattern()`` object, but for more complex parsers this can be overridden to provide a different object, e.g. a tuple, for that parer's specific iteration needs. :param item: The unobfuscated string to generate the regex for :type item: ``str`` :returns: A compiled regex pattern for the item :rtype: ``re.Pattern`` """ if self.match_full_words_only: item = rf'(? # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import re from sos.cleaner.mappings import SoSMap class SoSHostnameMap(SoSMap): """Mapping store for hostnames and domain names Hostnames are obfuscated using an incrementing counter based on the total number of hosts matched regardless of domain name. Domain names are obfuscated based on the host's hostname, plus any user defined domains passed in by the `--domains` options. Domains are obfuscated as whole units, meaning the domains 'example.com' and 'host.foo.example.com' will be separately obfuscated with no relation for example as 'obfuscatedomdain1.com' and 'obfuscatedomain2.com'. Top-level domains are left untouched. """ ignore_matches = [ 'localhost', '.*localdomain.*', '^com..*' ] skip_keys = [ 'www', 'api' ] strip_exts = ('.yaml', '.yml', '.crt', '.key', '.pem', '.log', '.repo', '.rules', '.conf', '.cfg') ignore_short_items = True match_full_words_only = True host_count = 0 domain_count = 0 _domains = {} hosts = {} def load_domains_from_map(self): """Because we use 'intermediary' dicts for host names and domain names in this parser, we need to re-inject entries from the map_file into these dicts and not just the underlying 'dataset' dict """ for domain, ob_pair in self.dataset.items(): if len(domain.split('.')) == 1: self.hosts[domain.split('.')[0]] = self.dataset[domain] else: if ob_pair.startswith('obfuscateddomain'): # directly exact domain matches self._domains[domain] = ob_pair.split('.')[0] continue # strip the host name and trailing top-level domain so that # we in inject the domain properly for later string matching # note: this is artificially complex due to our stance on # preserving TLDs. If in the future the project decides to # obfuscate TLDs as well somehow, then this will all become # much simpler _domain_to_inject = '.'.join(domain.split('.')[1:-1]) if not _domain_to_inject: continue for existing_domain, value in self.dataset.items(): _existing = '.'.join(existing_domain.split('.')[:-1]) if _existing == _domain_to_inject: _ob_domain = '.'.join(value.split('.')[:-1]) self._domains[_domain_to_inject] = _ob_domain self.set_initial_counts() def get_regex_result(self, item): """Override the base get_regex_result() to provide a regex that, if this is an FQDN or a straight domain, will include an underscore formatted regex as well. """ if '.' in item: item = item.replace('.', '(\\.|_)') return super().get_regex_result(item) def set_initial_counts(self): """Set the initial counter for host and domain obfuscation numbers based on what is already present in the mapping. """ # hostnames/short names try: h = sorted(self.hosts.values(), reverse=True)[0].split('host')[1] self.host_count = int(h) + 1 except IndexError: # no hosts loaded yet pass # domain names try: d = sorted(self._domains.values(), reverse=True)[0].split('domain') self.domain_count = int(d[1].split('.')[0]) + 1 except IndexError: # no domains loaded yet pass def domain_name_in_loaded_domains(self, domain): """Check if a potential domain is in one of the domains we've loaded and should be obfuscated """ if domain in self._domains: return True host = domain.split('.') no_tld = '.'.join(domain.split('.')[0:-1]) if len(host) == 1: # don't block on host's shortname return host[0] in self.hosts if any(no_tld.endswith(_d) for _d in self._domains): return True return False def get(self, item): # pylint: disable=too-many-branches prefix = '' suffix = '' final = None # The regex pattern match may include a leading and/or trailing '_' # character due to the need to use word boundary matching, so we need # to strip these from the string during processing, but still keep them # in the returned string to not mangle the string replacement in the # context of the file or filename while item.startswith(('.', '_')): prefix += item[0] item = item[1:] while item.endswith(('.', '_')): suffix += item[-1] item = item[0:-1] if item in self.dataset: return self.dataset[item] if not self.domain_name_in_loaded_domains(item.lower()): # no match => return the original string with optional # leading/trailing '.' or '_' characters return ''.join([prefix, item, suffix]) if item.endswith(self.strip_exts): ext = '.' + item.split('.')[-1] item = item.replace(ext, '') suffix += ext if item not in self.dataset: # try to account for use of '-' in names that include hostnames # and don't create new mappings for each of these for _existing in sorted(self.dataset.keys(), reverse=True, key=len): _host_substr = False _test = item.split(_existing) _h = _existing.split('.') # avoid considering a full FQDN match as a new match off of # the hostname of an existing match if _h[0] and _h[0] in self.hosts: _host_substr = True if len(_test) == 1 or not _test[0]: # does not match existing obfuscation continue if not _host_substr and (_test[0].endswith('.') or item.endswith(_existing)): # new hostname in known domain final = super().get(item) break if item.split(_test[0]): # string that includes existing FQDN obfuscation substring # so, only obfuscate the FQDN part try: itm = item.split(_test[0])[1] final = _test[0] + super().get(itm) break except Exception: # fallback to still obfuscating the entire item pass if not final: final = super().get(item) return prefix + final + suffix def sanitize_item(self, item): host = item.split('.') if len(host) == 1: # we have a shortname for a host return self.sanitize_short_name(host[0].lower()) if len(host) == 2: # we have just a domain name, e.g. example.com dname = self.sanitize_domain(host) if all(h.isupper() for h in host): dname = dname.upper() return dname if len(host) > 2: # we have an FQDN, e.g. foo.example.com hostname = host[0] domain = host[1:] # obfuscate the short name if len(hostname) > 2: ob_hostname = self.sanitize_short_name(hostname.lower()) else: # by best practice it appears the host part of the fqdn was cut # off due to some form of truncating, as such don't obfuscate # short strings that are likely to throw off obfuscation of # unrelated bits and paths ob_hostname = 'unknown' ob_domain = self.sanitize_domain(domain) self.dataset[item] = ob_domain _fqdn = '.'.join([ob_hostname, ob_domain]) if all(h.isupper() for h in host): _fqdn = _fqdn.upper() return _fqdn return None def sanitize_short_name(self, hostname): """Obfuscate the short name of the host with an incremented counter based on the total number of obfuscated host names """ if not hostname or hostname in self.skip_keys: return hostname if hostname not in self.dataset: ob_host = f"host{self.host_count}" self.hosts[hostname] = ob_host self.host_count += 1 self.dataset[hostname] = ob_host self.add_regex_item(hostname) return self.dataset[hostname] def sanitize_domain(self, domain): """Obfuscate the domainname, broken out into subdomains. Top-level domains are ignored. """ for _skip in self.ignore_matches: # don't obfuscate vendor domains if re.match(_skip, '.'.join(domain)): return '.'.join(domain) top_domain = domain[-1].lower() dname = '.'.join(domain[0:-1]).lower() ob_domain = self._new_obfuscated_domain(dname) ob_domain = '.'.join([ob_domain, top_domain]) self.dataset['.'.join(domain)] = ob_domain return ob_domain def _new_obfuscated_domain(self, dname): """Generate an obfuscated domain for each subdomain name given """ if dname not in self._domains: self._domains[dname] = f"obfuscateddomain{self.domain_count}" self.domain_count += 1 return self._domains[dname] mappings/ip_map.py000064400000020067151116317160010210 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import ipaddress from sos.cleaner.mappings import SoSMap class SoSIPMap(SoSMap): """A mapping store for IP addresses Each IP address added to this map is chcked for subnet membership. If that subnet already exists in the map, then IP addresses are deterministically generated sequentially within that subnet. For example, if a given IP is matched to subnet 192.168.1.0/24 then 192.168.1 may be obfuscated to 100.11.12.0/24. Each IP address in the original 192.168.1.0/24 subnet will then be assigned an address in 100.11.12.0/24 sequentially, such as 100.11.12.1, 100.11.12.2, etc... Internally, the ipaddress library is used to manipulate the address objects however, when retrieved by SoSCleaner any values will be strings. """ ignore_matches = [ r'127.*', r'::1', r'0\.(.*)?', r'1\.(.*)?', r'8.8.8.8', r'8.8.4.4', r'169.254.*', r'255.*' ] _networks = {} network_first_octet = 100 skip_network_octets = ['127', '169', '172', '192'] compile_regexes = False # counter for obfuscating a single IP address; the value stands for # 172.17.0.0; we use a private block of IP addresses and ignore # 172.16.0.0/16 block as those addresses are more often used in real # (an attempt to prevent confusion) _saddr_cnt = 2886795264 def ip_in_dataset(self, ipaddr): """There are multiple ways in which an ip address could be handed to us in a way where we're matching against a previously obfuscated address. Here, match the ip address to any of the obfuscated addresses we've already created """ for _ip in self.dataset.values(): if str(ipaddr).split('/', maxsplit=1)[0] == _ip.split('/')[0]: return True return False def get(self, item): """Ensure that when requesting an obfuscated address, we return a str object instead of an IPv(4|6)Address object """ filt_start = ('/', '=', ']', ')') if item.startswith(filt_start): item = item.lstrip(''.join(filt_start)) if item in self.dataset: return self.dataset[item] if self.ignore_item(item) or self.ip_in_dataset(item): return item # it's not in there, but let's make sure we haven't previously added # an address with a CIDR notation and we're now looking for it without # that notation if '/' not in item: for key, value in self.dataset.items(): if key.startswith(item): return value.split('/')[0] # fallback to the default map behavior of adding it fresh return self.add(item) def set_ip_cidr_from_existing_subnet(self, addr): """Determine if a given address is in a subnet of an already obfuscated network and if it is, then set the address' network to the network object we're tracking. This allows us to match ip addresses with or without a CIDR notation and maintain proper network relationships. """ nets = [] for net in self._networks: if addr.ip == net.broadcast_address: addr.network = net return if addr.ip in net: nets.append(net) # assign the address to the smallest network that was matched. This is # necessary due to certain files specifying addresses that cause the # ipaddress library to create artificially huge subnets that will # include the actual subnets used by the system if nets: nets.sort(key=lambda n: n.prefixlen, reverse=True) addr.network = nets[0] def sanitize_item(self, item): """Given an IP address, sanitize it to an obfuscated network or host address as appropriate """ try: addr = ipaddress.ip_interface(item) except ValueError: # not an IP, add it to the skip list to avoid flooding logs self.ignore_matches.append(item) raise network = addr.network if str(network.netmask) == '255.255.255.255': # check to see if this IP is in a subnet of an already obfuscated # network and if it has, replace the default /32 netmask that # ipaddress applies to no CIDR-notated addresses self.set_ip_cidr_from_existing_subnet(addr) else: # we have a CIDR notation, so generate an obfuscated network # address and then generate an IP address within that network's # range self.sanitize_network(network) return self.sanitize_ipaddr(addr) def sanitize_network(self, network): """Obfuscate the network address provided, and if there are host bits in the address then obfuscate those as well """ # check if the address is in a network we've already encountered if network not in self._networks: self._new_obfuscated_network(network) def sanitize_ipaddr(self, addr): """Obfuscate the IP address within the known obfuscated network """ # get the obfuscated network object if addr.network in self._networks: _obf_network = self._networks[addr.network] # if the plain address is the broadcast address for it's own # network, then assign the broadcast address for the obfuscated # network if addr.ip == addr.network.broadcast_address: return str(_obf_network.broadcast_address) # otherwise within that obfuscated network grab the next available # address from it for _ip in _obf_network.hosts(): if not self.ip_in_dataset(_ip): # the ipaddress module does not assign the network's # netmask to hosts in the hosts() generator for some reason return f"{str(_ip)}/{_obf_network.prefixlen}" # ip is a single ip address without the netmask return self._new_obfuscated_single_address() def _new_obfuscated_single_address(self): # increment the counter and ignore *.0 and *.255 addresses self._saddr_cnt += 1 while self._saddr_cnt % 256 in (0, 255): self._saddr_cnt += 1 # split the counter value to four octets (i.e. % 256) to get an # obfuscated IP address _addr = f"{self._saddr_cnt >> 24}.{(self._saddr_cnt >> 16) % 256}." \ f"{(self._saddr_cnt >> 8) % 256}.{self._saddr_cnt % 256}" if _addr in self.dataset.values(): return self._new_obfuscated_single_address() return _addr def _new_obfuscated_network(self, network): """Generate an obfuscated network address for the network address given which will allow us to maintain network relationships without divulging actual network details Positional arguments: :param network: An ipaddress.IPv{4|6)Network object """ _obf_network = None if isinstance(network, ipaddress.IPv4Network): if self.network_first_octet in self.skip_network_octets: self.network_first_octet += 1 _obf_address = f"{self.network_first_octet}.0.0.0" _obf_mask = network.with_netmask.split('/')[1] _obf_network = ipaddress.IPv4Network(f"{_obf_address}/{_obf_mask}") self.network_first_octet += 1 if isinstance(network, ipaddress.IPv6Network): # TODO: define this pass if _obf_network: self._networks[network] = _obf_network self.dataset[str(network)] = str(_obf_network) mappings/ipv6_map.py000064400000027456151116317160010475 0ustar00# Copyright 2022 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import ipaddress from sos.cleaner.mappings import SoSMap class SoSIPv6Map(SoSMap): """Mapping for IPv6 addresses and networks. Much like the IP map handles IPv4 addresses, this map is designed to take IPv6 strings and obfuscate them consistently to maintain network topology. To do this, addresses will be manipulated by the ipaddress library. If an IPv6 address is encountered without a netmask, it is assumed to be a /64 address. """ networks = {} ignore_matches = [ r'^::1/.*', r'::/0', r'fd53:.*', r'^53..:' ] first_hexes = ['534f'] compile_regexes = False version = 1 def conf_update(self, config): """Override the base conf_update() so that we can load the existing networks into ObfuscatedIPv6Network() objects for the current run. """ if 'networks' not in config: return for network in config['networks']: _orig = ipaddress.ip_network(network) _obfuscated = config['networks'][network]['obfuscated'] _net = self._get_network(_orig, _obfuscated) self.dataset[_net.original_address] = _net.obfuscated_address for host in config['networks'][network]['hosts']: _ob_host = config['networks'][network]['hosts'][host] _net.add_obfuscated_host_address(host, _ob_host) self.dataset[host] = _ob_host def sanitize_item(self, item): _prefix = item.split('/')[-1] if '/' in item else '' _ipaddr = item if not _prefix: # assume a /64 default per protocol _ipaddr += "/64" try: _addr = ipaddress.ip_network(_ipaddr) # ipaddr was an actual network per protocol _net = self._get_network(_addr) _ipaddr = _net.obfuscated_address except ValueError: # A ValueError is raised from the ipaddress module when passing # an address such as 2620:52:0:2d80::4fe/64, which has host bits # '::4fe' set - the /64 is generally interpreted only for network # addresses. We use this behavior to properly obfuscate the network # before obfuscating a host address within that network _addr = ipaddress.ip_network(_ipaddr, strict=False) _net = self._get_network(_addr) if _net.network_addr not in self.dataset: self.dataset[_net.original_address] = _net.obfuscated_address # then, get the address within the network _hostaddr = ipaddress.ip_address(_ipaddr.split('/')[0]) _ipaddr = _net.obfuscate_host_address(_hostaddr) if _prefix and '/' not in _ipaddr: return f"{_ipaddr}/{_prefix}" return _ipaddr def _get_network(self, address, obfuscated=''): """Attempt to find an existing ObfuscatedIPv6Network object from which to either find an existing obfuscated match, or create a new one. If no such object already exists, create it. """ _addr = address.compressed if _addr not in self.networks: self.networks[_addr] = ObfuscatedIPv6Network(address, obfuscated, self.first_hexes) return self.networks[_addr] class ObfuscatedIPv6Network(): """An abstraction class that represents a network that is (to be) handled by sos. Each distinct IPv6 network that we encounter will have a representative instance of this class, from which new obfuscated subnets and host addresses will be generated. This class should be built from an ``ipaddress.IPv6Network`` object. If an obfuscation string is not passed, one will be created during init. """ # dict of counters for obfuscated hexes generation ob_counters = {} def __init__(self, addr, obfuscation='', used_hexes=None): """Basic setup for the obfuscated network. Minor validation on the addr used to create the instance, as well as on an optional ``obfuscation`` which if set, will serve as the obfuscated_network address. :param addr: The *un*obfuscated network to be handled :type addr: ``ipaddress.IPv6Network`` :param obfuscation: An optional pre-determined string representation of the obfuscated network address :type obfuscation: ``str`` :param used_hexes: A list of already used hexes for the first hextet of a potential global address obfuscation :type used_hexes: ``list`` """ if not isinstance(addr, ipaddress.IPv6Network): raise Exception('Invalid network: not an IPv6Network object') self.addr = addr self.prefix = addr.prefixlen self.network_addr = addr.network_address.compressed self.hosts = {} if used_hexes is None: self.first_hexes = ['534f'] else: self.first_hexes = used_hexes if not obfuscation: self._obfuscated_network = self._obfuscate_network_address() else: if not isinstance(obfuscation, str): raise TypeError(f"Pre-determined obfuscated network address " f"must be str, not {type(obfuscation)}") self._obfuscated_network = obfuscation.split('/')[0] @property def obfuscated_address(self): return f"{self._obfuscated_network}/{self.prefix}" @property def original_address(self): return self.addr.compressed def generate_hextets(self, hextets): """Generate a set of obfuscated hextets, based on the length of the source hextet. If any hextets are compressed, keep that compression. E.G. '::1234:bcd' will generate a leading empty '' hextet, followed by two 4-character hextets, e.g. '::0005:0006'. :param hextets: The extracted hextets from a source address :type hextets: ``list`` :returns: A set of generated hextets for use in an obfuscated address :rtype: ``list`` """ return [self.obfuscate_hex(4) if h else '' for h in hextets] def obfuscate_hex(self, length): """Generate a string of size length of hex characters. Due to the need of deterministic generation in concurrent cleaner, generation starts from zero values and is incremented by one (for a given length). :param length: The number of characters to generate :type length: ``int`` :returns: A string of ``length`` hex characters :rtype: ``str`` """ val = self.ob_counters.get(length, 0) + 1 self.ob_counters[length] = val return f"{val:0{length}x}" def _obfuscate_network_address(self): """Generate the obfuscated pair for the network address. This is determined based on the netmask of the network this class was built on top of. """ if self.addr.is_global: return self._obfuscate_global_address() if self.addr.is_link_local: # link-local addresses are always fe80::/64. This is not sensitive # in itself, and retaining the information that an address is a # link-local address is important for problem analysis, so don't # obfuscate this network information. return self.network_addr if self.addr.is_private: return self._obfuscate_private_address() return self.network_addr def _obfuscate_global_address(self): """Global unicast addresses have a 48-bit global routing prefix and a 16-bit subnet. We set the global routing prefix to a static sos-specific identifier that could never be seen in the wild, '534f:' We then obfuscate the subnet hextet. """ _hextets = self.network_addr.split(':')[1:] _ob_hex = ['534f'] if all(not c for c in _hextets): # we have only a single defined hextet, e.g. ff00::/64, so we need # to not use the standard first-hex identifier or we'll overlap # every similar address obfuscation. # Set the leading bits to 53, but increment upwards from there for # when we exceed 256 networks obfuscated in this manner. _start = 53 + (len(self.first_hexes) // 256) _ob_hex = f"{_start}{self.obfuscate_hex(2)}" while _ob_hex in self.first_hexes: # prevent duplicates _ob_hex = f"{_start}{self.obfuscate_hex(2)}" self.first_hexes.append(_ob_hex) _ob_hex = [_ob_hex] ext = self.generate_hextets(_hextets) _ob_hex.extend(ext) return ':'.join(_ob_hex) def _obfuscate_private_address(self): """The first 8 bits will always be 'fd', the next 40 bits are meant to be a global ID, followed by 16 bits for the subnet. To keep things relatively simply we maintain the first hextet as 'fd53', and then obfuscate any remaining hextets. """ _hextets = self.network_addr.split(':')[1:] _ob_hex = ['fd53'] _ob_hex.extend(self.generate_hextets(_hextets)) return ':'.join(_ob_hex) def obfuscate_host_address(self, addr): """Given an unobfuscated address, generate an obfuscated match for it, and save it to this network for tracking during the execution of clean. Note: another way to do this would be to convert the obfuscated network to bytes, and add a random amount to that based on the number of addresses that the network can support and from that new bytes count craft a new IPv6 address. This has the advantage of absolutely guaranteeing the new address is within the network space (whereas the method employed below could *theoretically* generate an overlapping address), but would in turn remove any ability to compress obfuscated addresses to match the general format/syntax of the address it is replacing. For the moment, it is assumed that being able to maintain a quick mental note of "unobfuscated device ff00::1 is obfuscated device 53ad::a1b2" is more desireable than "ff00::1 is now obfuscated as 53ad::1234:abcd:9876:a1b2:". :param addr: The unobfuscated IPv6 address :type addr: ``ipaddress.IPv6Address`` :returns: An obfuscated address within this network :rtype: ``str`` """ def _generate_address(host): return ''.join([ self._obfuscated_network, ':'.join(self.generate_hextets(host.split(':'))) ]) if addr.compressed not in self.hosts: # separate host from the address by removing its network prefix _n = self.network_addr.rstrip(':') _host = addr.compressed[len(_n):].lstrip(':') _ob_host = _generate_address(_host) while _ob_host in self.hosts.values(): _ob_host = _generate_address(_host) self.add_obfuscated_host_address(addr.compressed, _ob_host) return self.hosts[addr.compressed] def add_obfuscated_host_address(self, host, obfuscated): """Adds an obfuscated pair to the class for tracking and ongoing consistency in obfuscation. """ self.hosts[host] = obfuscated mappings/keyword_map.py000064400000002174151116317160011263 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.mappings import SoSMap class SoSKeywordMap(SoSMap): """Mapping store for user provided keywords By default, this map will perform no matching or obfuscation. It relies entirely on the use of the --keywords option by the user. Any keywords provided are then obfuscated into 'obfuscatedwordX', where X is an incrementing integer. """ match_full_words_only = True word_count = 0 def sanitize_item(self, item): if item in self.dataset: return self.dataset[item] _ob_item = f"obfuscatedword{self.word_count}" self.word_count += 1 if _ob_item in self.dataset.values(): return self.sanitize_item(item) return _ob_item mappings/mac_map.py000064400000006033151116317160010335 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import re from sos.cleaner.mappings import SoSMap class SoSMacMap(SoSMap): """Mapping store for MAC addresses MAC addresses added to this map will be broken into two halves, vendor and device like how MAC addresses are normally crafted. For the vendor hextets, obfuscation will take the form of 53:4f:53, or 'SOS' in hex. The following device hextets will be obfuscated by a series of suffixes starting from zeroes. For example a MAC address of '60:55:cb:4b:c9:27' may be obfuscated into '53:4f:53:00:00:1a' or similar. This map supports both 48-bit and 64-bit MAC addresses. 48-bit address may take the form of either: MM:MM:MM:SS:SS:SS MM-MM-MM-SS-SS-SS For 64-bit addresses, the identifier injected by IPv6 standards is used in obfuscated returns. These addresses may take either of these forms: MM:MM:MM:FF:FE:SS:SS:SS MMMM:MMFF:FESS:SSSS All mapped mac addresses are converted to lower case. Dash delimited styles will be converted to colon-delimited style. """ ignore_matches = [ 'ff:ff:ff:ff:ff:ff', '00:00:00:00:00:00' ] mac_template = '53:4f:53:%s:%s:%s' mac6_template = '53:4f:53:ff:fe:%s:%s:%s' mac6_quad_template = '534f:53ff:fe%s:%s%s' compile_regexes = False ob_hextets_cnt = 0 def add(self, item): item = item.replace('-', ':').lower().strip('=.,').strip() return super().add(item) def get(self, item): item = item.replace('-', ':').lower().strip('=.,').strip() return super().get(item) def sanitize_item(self, item): """Obfuscate the device hextets, and append those to our 'vendor' hextet """ hexdigits = "0123456789abdcef" self.ob_hextets_cnt += 1 # we need to convert the counter to a triple of double hex-digits hextets = [ self.ob_hextets_cnt >> 16, (self.ob_hextets_cnt >> 8) % 256, self.ob_hextets_cnt % 256 ] hextets = tuple(f'{hexdigits[i//16]}{hexdigits[i % 16]}' for i in hextets) # match 64-bit IPv6 MAC addresses matching MM:MM:MM:FF:FE:SS:SS:SS if re.match('(([0-9a-fA-F]{2}:){7}[0-9a-fA-F]{2})', item): return self.mac6_template % hextets # match 64-bit IPv6 MAC addresses matching MMMM:MMFF:FESS:SSSS if re.match('(([0-9a-fA-F]{4}:){3}([0-9a-fA-F]){4})', item): return self.mac6_quad_template % hextets # match 48-bit IPv4 MAC addresses if re.match('([0-9a-fA-F][:_]?){12}', item): return self.mac_template % hextets return None mappings/username_map.py000064400000002215151116317160011412 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.mappings import SoSMap class SoSUsernameMap(SoSMap): """Mapping to store usernames matched from ``lastlog`` output. Usernames are obfuscated as ``obfuscateduserX`` where ``X`` is a counter that gets incremented for every new username found. Note that this specifically obfuscates user_names_ and not UIDs. """ ignore_short_items = True match_full_words_only = True name_count = 0 def sanitize_item(self, item): """Obfuscate a new username not currently found in the map """ ob_name = f"obfuscateduser{self.name_count}" self.name_count += 1 if ob_name in self.dataset.values(): return self.sanitize_item(item.lower()) return ob_name parsers/__pycache__/__init__.cpython-39.opt-1.pyc000064400000014646151116317160015577 0ustar00a \h@sddlZGdddZdS)Nc@szeZdZdZdZgZgZgZgZdZ dZ igfddZ ddZ d d Z d d Zd dZddZddZddZddZdS)SoSCleanerParseraParsers are used to build objects that will take a line as input, parse it for a particular pattern (E.G. IP addresses) and then make any necessary subtitutions by referencing the SoSMap() associated with the parser. Ideally a new parser subclass will only need to set the class level attrs in order to be fully functional. :param conf_file: The configuration file to read from :type conf_file: ``str`` :cvar name: The parser name, used in logging errors :vartype name: ``str`` :cvar regex_patterns: A list of regex patterns to iterate over for every line processed :vartype regex_patterns: ``list`` :cvar mapping: Used by the parser to store and obfuscate matches :vartype mapping: ``SoSMap()`` :cvar map_file_key: The key in the ``map_file`` to read when loading previous obfuscation matches :vartype map_file_key: ``str`` zUndefined ParserZunsetTcCs.|j|vr|j||j||_|dSN) map_file_keymappingZ conf_updateskip_cleaning_files_generate_skip_regexes)selfZconfigrr @/usr/lib/python3.9/site-packages/sos/cleaner/parsers/__init__.py__init__4s zSoSCleanerParser.__init__cCs|jdSr)rZ load_entriesrr r r load_map_entries:sz!SoSCleanerParser.load_map_entriescCs.g|_|j|jD]}|jt|qdS)zGenerate the regexes for the parser's configured parser_skip_files or global skip_cleaning_files, so that we don't regenerate them on every file being examined for if the parser should skip a given file. N)Z skip_patternsparser_skip_filesrappendrecompile)rpr r r r=sz'SoSCleanerParser._generate_skip_regexescCs(|js dS|jjD]}|j|qdS)zGenerate regexes for items the parser will be searching for repeatedly without needing to generate them for every file and/or line we process Not used by all parsers. N)compile_regexesrdatasetZadd_regex_item)rZobitemr r r generate_item_regexesFs z&SoSCleanerParser.generate_item_regexescCsfd}|jD] }t||tjr ||fSq |jrH||\}}||7}||\}}||7}||fS)aThis will be called for every line in every file we process, so that every parser has a chance to scrub everything. This will first try to identify needed obfuscations for items we have already encountered (if the parser uses compiled regexes that is) and make those substitutions early on. After which, we will then parse the line again looking for new matches. r)skip_line_patternsrmatchIr!_parse_line_with_compiled_regexes _parse_line)rlinecountZ skip_patternZ_rcount_countr r r parse_lineRs  zSoSCleanerParser.parse_linecCsLd}|jjD]6\}}||r ||j||\}}||7}q ||fS)ahCheck the provided line against known items we have encountered before and have pre-generated regex Pattern() objects for. :param line: The line to parse for possible matches for obfuscation :type line: ``str`` :returns: The obfuscated line and the number of changes made :rtype: ``str``, ``int`` r)rcompiled_regexessearchsubngetlower)rrritemregrr r r rfs   z2SoSCleanerParser._parse_line_with_compiled_regexescCsd}|jD]}ddt||tjD}|r |jdtd|t|7}|D]>}|}||jj vrhqJ|j |}||krJ| ||}qJq ||fS)aRCheck the provided line against the parser regex patterns to try and discover _new_ items to obfuscate :param line: The line to parse for possible matches for obfuscation :type line: ``str`` :returns: The obfsucated line, and the number of changes made :rtype: ``tuple``, ``(str, int))`` rcSsg|] }|dqS)rr ).0mr r r z0SoSCleanerParser._parse_line..Treversekey) regex_patternsrfindallrsortlenstriprrvaluesr"replace)rrrpatternmatchesrZ new_matchr r r rws    zSoSCleanerParser._parse_linecCs|jr<|jjD]*\}}||r||j||}qnFt|jj ddddD]*\}}||jj vrlqV||vrV| ||}qV|S)aParse a given string for instances of any obfuscated items, without applying the normal regex comparisons first. This is mainly used to obfuscate filenames that have, for example, hostnames in them. Rather than try to regex match the string_data, just use the builtin checks for substrings matching known obfuscated keys :param string_data: The line to be parsed :type string_data: ``str`` :returns: The obfuscated line :rtype: ``str`` TcSs t|dS)Nr)r0)xr r r r)z8SoSCleanerParser.parse_string_for_keys..r*) rrrr subr"r#sortedritemsZ skip_keysr3)rZ string_datar$r%kobr r r parse_string_for_keyss  z&SoSCleanerParser.parse_string_for_keyscCs|jjS)zGet the contents of the mapping used by the parser :returns: All matches and their obfuscate counterparts :rtype: ``dict`` )rrr r r r get_map_contentssz!SoSCleanerParser.get_map_contentsN)__name__ __module__ __qualname____doc__namer-rrrrrr r rrrrrr=r>r r r r rs"  r)rrr r r r  sparsers/__pycache__/__init__.cpython-39.pyc000064400000014646151116317160014640 0ustar00a \h@sddlZGdddZdS)Nc@szeZdZdZdZgZgZgZgZdZ dZ igfddZ ddZ d d Z d d Zd dZddZddZddZddZdS)SoSCleanerParseraParsers are used to build objects that will take a line as input, parse it for a particular pattern (E.G. IP addresses) and then make any necessary subtitutions by referencing the SoSMap() associated with the parser. Ideally a new parser subclass will only need to set the class level attrs in order to be fully functional. :param conf_file: The configuration file to read from :type conf_file: ``str`` :cvar name: The parser name, used in logging errors :vartype name: ``str`` :cvar regex_patterns: A list of regex patterns to iterate over for every line processed :vartype regex_patterns: ``list`` :cvar mapping: Used by the parser to store and obfuscate matches :vartype mapping: ``SoSMap()`` :cvar map_file_key: The key in the ``map_file`` to read when loading previous obfuscation matches :vartype map_file_key: ``str`` zUndefined ParserZunsetTcCs.|j|vr|j||j||_|dSN) map_file_keymappingZ conf_updateskip_cleaning_files_generate_skip_regexes)selfZconfigrr @/usr/lib/python3.9/site-packages/sos/cleaner/parsers/__init__.py__init__4s zSoSCleanerParser.__init__cCs|jdSr)rZ load_entriesrr r r load_map_entries:sz!SoSCleanerParser.load_map_entriescCs.g|_|j|jD]}|jt|qdS)zGenerate the regexes for the parser's configured parser_skip_files or global skip_cleaning_files, so that we don't regenerate them on every file being examined for if the parser should skip a given file. N)Z skip_patternsparser_skip_filesrappendrecompile)rpr r r r=sz'SoSCleanerParser._generate_skip_regexescCs(|js dS|jjD]}|j|qdS)zGenerate regexes for items the parser will be searching for repeatedly without needing to generate them for every file and/or line we process Not used by all parsers. N)compile_regexesrdatasetZadd_regex_item)rZobitemr r r generate_item_regexesFs z&SoSCleanerParser.generate_item_regexescCsfd}|jD] }t||tjr ||fSq |jrH||\}}||7}||\}}||7}||fS)aThis will be called for every line in every file we process, so that every parser has a chance to scrub everything. This will first try to identify needed obfuscations for items we have already encountered (if the parser uses compiled regexes that is) and make those substitutions early on. After which, we will then parse the line again looking for new matches. r)skip_line_patternsrmatchIr!_parse_line_with_compiled_regexes _parse_line)rlinecountZ skip_patternZ_rcount_countr r r parse_lineRs  zSoSCleanerParser.parse_linecCsLd}|jjD]6\}}||r ||j||\}}||7}q ||fS)ahCheck the provided line against known items we have encountered before and have pre-generated regex Pattern() objects for. :param line: The line to parse for possible matches for obfuscation :type line: ``str`` :returns: The obfuscated line and the number of changes made :rtype: ``str``, ``int`` r)rcompiled_regexessearchsubngetlower)rrritemregrr r r rfs   z2SoSCleanerParser._parse_line_with_compiled_regexescCsd}|jD]}ddt||tjD}|r |jdtd|t|7}|D]>}|}||jj vrhqJ|j |}||krJ| ||}qJq ||fS)aRCheck the provided line against the parser regex patterns to try and discover _new_ items to obfuscate :param line: The line to parse for possible matches for obfuscation :type line: ``str`` :returns: The obfsucated line, and the number of changes made :rtype: ``tuple``, ``(str, int))`` rcSsg|] }|dqS)rr ).0mr r r z0SoSCleanerParser._parse_line..Treversekey) regex_patternsrfindallrsortlenstriprrvaluesr"replace)rrrpatternmatchesrZ new_matchr r r rws    zSoSCleanerParser._parse_linecCs|jr<|jjD]*\}}||r||j||}qnFt|jj ddddD]*\}}||jj vrlqV||vrV| ||}qV|S)aParse a given string for instances of any obfuscated items, without applying the normal regex comparisons first. This is mainly used to obfuscate filenames that have, for example, hostnames in them. Rather than try to regex match the string_data, just use the builtin checks for substrings matching known obfuscated keys :param string_data: The line to be parsed :type string_data: ``str`` :returns: The obfuscated line :rtype: ``str`` TcSs t|dS)Nr)r0)xr r r r)z8SoSCleanerParser.parse_string_for_keys..r*) rrrr subr"r#sortedritemsZ skip_keysr3)rZ string_datar$r%kobr r r parse_string_for_keyss  z&SoSCleanerParser.parse_string_for_keyscCs|jjS)zGet the contents of the mapping used by the parser :returns: All matches and their obfuscate counterparts :rtype: ``dict`` )rrr r r r get_map_contentssz!SoSCleanerParser.get_map_contentsN)__name__ __module__ __qualname____doc__namer-rrrrrr r rrrrrr=r>r r r r rs"  r)rrr r r r  sparsers/__pycache__/hostname_parser.cpython-39.opt-1.pyc000064400000003000151116317160017210 0ustar00a \hM@s4ddlZddlmZddlmZGdddeZdS)N)SoSCleanerParser)SoSHostnameMapcs6eZdZdZdZdgZgffdd ZddZZS)SoSHostnameParserzHostname ParserZ hostname_mapz5(((\b|_)[a-zA-Z0-9-\.]{1,200}\.[a-zA-Z]{1,63}(\b|_)))cst||_t||dS)N)rmappingsuper__init__)selfZconfigZworkdirZskip_cleaning_files __class__G/usr/lib/python3.9/site-packages/sos/cleaner/parsers/hostname_parser.pyrs zSoSHostnameParser.__init__cCsfd}|jD] }t||tjr ||fSq ||\}}||7}|jr^||\}}||7}||fS)aThis will be called for every line in every file we process, so that every parser has a chance to scrub everything. We are overriding parent method since we need to swap ordering of _parse_line_with_compiled_regexes and _parse_line calls. r)Zskip_line_patternsrematchIZ _parse_lineZcompile_regexesZ!_parse_line_with_compiled_regexes)rlinecountZ skip_pattern_countZ_rcountr r r parse_lines zSoSHostnameParser.parse_line) __name__ __module__ __qualname__nameZ map_file_keyZregex_patternsrr __classcell__r r r r rs r)r Zsos.cleaner.parsersrZ!sos.cleaner.mappings.hostname_maprrr r r r  s  parsers/__pycache__/hostname_parser.cpython-39.pyc000064400000003000151116317160016251 0ustar00a \hM@s4ddlZddlmZddlmZGdddeZdS)N)SoSCleanerParser)SoSHostnameMapcs6eZdZdZdZdgZgffdd ZddZZS)SoSHostnameParserzHostname ParserZ hostname_mapz5(((\b|_)[a-zA-Z0-9-\.]{1,200}\.[a-zA-Z]{1,63}(\b|_)))cst||_t||dS)N)rmappingsuper__init__)selfZconfigZworkdirZskip_cleaning_files __class__G/usr/lib/python3.9/site-packages/sos/cleaner/parsers/hostname_parser.pyrs zSoSHostnameParser.__init__cCsfd}|jD] }t||tjr ||fSq ||\}}||7}|jr^||\}}||7}||fS)aThis will be called for every line in every file we process, so that every parser has a chance to scrub everything. We are overriding parent method since we need to swap ordering of _parse_line_with_compiled_regexes and _parse_line calls. r)Zskip_line_patternsrematchIZ _parse_lineZcompile_regexesZ!_parse_line_with_compiled_regexes)rlinecountZ skip_pattern_countZ_rcountr r r parse_lines zSoSHostnameParser.parse_line) __name__ __module__ __qualname__nameZ map_file_keyZregex_patternsrr __classcell__r r r r rs r)r Zsos.cleaner.parsersrZ!sos.cleaner.mappings.hostname_maprrr r r r  s  parsers/__pycache__/ip_parser.cpython-39.opt-1.pyc000064400000002355151116317160016016 0ustar00a \h@s,ddlmZddlmZGdddeZdS))SoSCleanerParser)SoSIPMapcsDeZdZdZdZdgZdgZgdZdZdZ gffdd Z Z S) SoSIPParserz Handles parsing for IP addressesz IP Parserz>((? s  parsers/__pycache__/ip_parser.cpython-39.pyc000064400000002355151116317160015057 0ustar00a \h@s,ddlmZddlmZGdddeZdS))SoSCleanerParser)SoSIPMapcsDeZdZdZdZdgZdgZgdZdZdZ gffdd Z Z S) SoSIPParserz Handles parsing for IP addressesz IP Parserz>((? s  parsers/__pycache__/ipv6_parser.cpython-39.opt-1.pyc000064400000003120151116317160016261 0ustar00a \h- @s,ddlmZddlmZGdddeZdS))SoSCleanerParser) SoSIPv6MapcsFeZdZdZdZdZdgZddgZdZgffdd Z d d Z Z S) SoSIPv6Parserz/Parser for handling IPv6 networks and addressesz IPv6 ParserZipv6_mapz(?q|S)zsStructure the dataset contents properly so that they can be reloaded on subsequent runs correctly. )versionnetworks)Z obfuscatedhostsrr)rr ritemsZobfuscated_addressZoriginal_addressr)rZ_d_Z_nethostZ_ob_hostr r r get_map_contents*s  zSoSIPv6Parser.get_map_contents) __name__ __module__ __qualname____doc__nameZ map_file_keyZregex_patternsZparser_skip_filesZcompile_regexesrr __classcell__r r r r rs  rN)Zsos.cleaner.parsersrZsos.cleaner.mappings.ipv6_maprrr r r r  s  parsers/__pycache__/ipv6_parser.cpython-39.pyc000064400000003120151116317160015322 0ustar00a \h- @s,ddlmZddlmZGdddeZdS))SoSCleanerParser) SoSIPv6MapcsFeZdZdZdZdZdgZddgZdZgffdd Z d d Z Z S) SoSIPv6Parserz/Parser for handling IPv6 networks and addressesz IPv6 ParserZipv6_mapz(?q|S)zsStructure the dataset contents properly so that they can be reloaded on subsequent runs correctly. )versionnetworks)Z obfuscatedhostsrr)rr ritemsZobfuscated_addressZoriginal_addressr)rZ_d_Z_nethostZ_ob_hostr r r get_map_contents*s  zSoSIPv6Parser.get_map_contents) __name__ __module__ __qualname____doc__nameZ map_file_keyZregex_patternsZparser_skip_filesZcompile_regexesrr __classcell__r r r r rs  rN)Zsos.cleaner.parsersrZsos.cleaner.mappings.ipv6_maprrr r r r  s  parsers/__pycache__/keyword_parser.cpython-39.opt-1.pyc000064400000001720151116317160017065 0ustar00a \h@s,ddlmZddlmZGdddeZdS))SoSCleanerParser) SoSKeywordMapcs4eZdZdZdZdZgffdd ZddZZS)SoSKeywordParserz/Handles parsing for user provided keywords zKeyword ParserZ keyword_mapcst||_t||dS)N)rmappingsuper__init__)selfZconfigZworkdirZskip_cleaning_files __class__F/usr/lib/python3.9/site-packages/sos/cleaner/parsers/keyword_parser.pyrs zSoSKeywordParser.__init__cCs|dfS)Nrr )rliner r r _parse_lineszSoSKeywordParser._parse_line) __name__ __module__ __qualname____doc__nameZ map_file_keyrr __classcell__r r r r rs rN)Zsos.cleaner.parsersrZ sos.cleaner.mappings.keyword_maprrr r r r  s  parsers/__pycache__/keyword_parser.cpython-39.pyc000064400000001720151116317160016126 0ustar00a \h@s,ddlmZddlmZGdddeZdS))SoSCleanerParser) SoSKeywordMapcs4eZdZdZdZdZgffdd ZddZZS)SoSKeywordParserz/Handles parsing for user provided keywords zKeyword ParserZ keyword_mapcst||_t||dS)N)rmappingsuper__init__)selfZconfigZworkdirZskip_cleaning_files __class__F/usr/lib/python3.9/site-packages/sos/cleaner/parsers/keyword_parser.pyrs zSoSKeywordParser.__init__cCs|dfS)Nrr )rliner r r _parse_lineszSoSKeywordParser._parse_line) __name__ __module__ __qualname____doc__nameZ map_file_keyrr __classcell__r r r r rs rN)Zsos.cleaner.parsersrZ sos.cleaner.mappings.keyword_maprrr r r r  s  parsers/__pycache__/mac_parser.cpython-39.opt-1.pyc000064400000004425151116317160016146 0ustar00a \h @s@ddlZddlmZddlmZdZdZdZGdddeZdS) N)SoSCleanerParser) SoSMacMapze((?Jz,SoSMacParser._parse_line..) regex_patternsrefindallIlenr startswithobfuscated_patternsrgetreplace)rlinecountpatternmatchesrZstripped_matchZ new_matchr r r _parse_lineGs     zSoSMacParser._parse_line)__name__ __module__ __qualname____doc__name IPV6_REG_8HEX IPV6_REG_4HEXIPV4_REGrrZparser_skip_filesZ map_file_keyZcompile_regexesrrr# __classcell__r r r r r%s r) rZsos.cleaner.parsersrZsos.cleaner.mappings.mac_maprr)r*r+rr r r r  s  parsers/__pycache__/mac_parser.cpython-39.pyc000064400000004425151116317160015207 0ustar00a \h @s@ddlZddlmZddlmZdZdZdZGdddeZdS) N)SoSCleanerParser) SoSMacMapze((?Jz,SoSMacParser._parse_line..) regex_patternsrefindallIlenr startswithobfuscated_patternsrgetreplace)rlinecountpatternmatchesrZstripped_matchZ new_matchr r r _parse_lineGs     zSoSMacParser._parse_line)__name__ __module__ __qualname____doc__name IPV6_REG_8HEX IPV6_REG_4HEXIPV4_REGrrZparser_skip_filesZ map_file_keyZcompile_regexesrrr# __classcell__r r r r r%s r) rZsos.cleaner.parsersrZsos.cleaner.mappings.mac_maprr)r*r+rr r r r  s  parsers/__pycache__/username_parser.cpython-39.opt-1.pyc000064400000002573151116317160017227 0ustar00a \h5@s,ddlmZddlmZGdddeZdS))SoSCleanerParser)SoSUsernameMapcs8eZdZdZdZdZgZgffdd ZddZZ S)SoSUsernameParseraParser for obfuscating usernames within an sos report archive. Note that this parser does not rely on regex matching directly, like most other parsers do. Instead, usernames are discovered via scraping the collected output of lastlog. As such, we do not discover new usernames later on, and only usernames present in lastlog output will be obfuscated, and those passed via the --usernames option if one is provided. zUsername ParserZ username_mapcst||_t||dS)N)rmappingsuper__init__)selfZconfigZworkdirZskip_cleaning_files __class__G/usr/lib/python3.9/site-packages/sos/cleaner/parsers/username_parser.pyrs zSoSUsernameParser.__init__cCs|dfS)Nrr )rliner r r _parse_line!szSoSUsernameParser._parse_line) __name__ __module__ __qualname____doc__nameZ map_file_keyZregex_patternsrr __classcell__r r r r rs  rN)Zsos.cleaner.parsersrZ!sos.cleaner.mappings.username_maprrr r r r  s  parsers/__pycache__/username_parser.cpython-39.pyc000064400000002573151116317160016270 0ustar00a \h5@s,ddlmZddlmZGdddeZdS))SoSCleanerParser)SoSUsernameMapcs8eZdZdZdZdZgZgffdd ZddZZ S)SoSUsernameParseraParser for obfuscating usernames within an sos report archive. Note that this parser does not rely on regex matching directly, like most other parsers do. Instead, usernames are discovered via scraping the collected output of lastlog. As such, we do not discover new usernames later on, and only usernames present in lastlog output will be obfuscated, and those passed via the --usernames option if one is provided. zUsername ParserZ username_mapcst||_t||dS)N)rmappingsuper__init__)selfZconfigZworkdirZskip_cleaning_files __class__G/usr/lib/python3.9/site-packages/sos/cleaner/parsers/username_parser.pyrs zSoSUsernameParser.__init__cCs|dfS)Nrr )rliner r r _parse_line!szSoSUsernameParser._parse_line) __name__ __module__ __qualname____doc__nameZ map_file_keyZregex_patternsrr __classcell__r r r r rs  rN)Zsos.cleaner.parsersrZ!sos.cleaner.mappings.username_maprrr r r r  s  parsers/__init__.py000064400000015266151116317160010350 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. # pylint: disable=no-member import re class SoSCleanerParser(): """Parsers are used to build objects that will take a line as input, parse it for a particular pattern (E.G. IP addresses) and then make any necessary subtitutions by referencing the SoSMap() associated with the parser. Ideally a new parser subclass will only need to set the class level attrs in order to be fully functional. :param conf_file: The configuration file to read from :type conf_file: ``str`` :cvar name: The parser name, used in logging errors :vartype name: ``str`` :cvar regex_patterns: A list of regex patterns to iterate over for every line processed :vartype regex_patterns: ``list`` :cvar mapping: Used by the parser to store and obfuscate matches :vartype mapping: ``SoSMap()`` :cvar map_file_key: The key in the ``map_file`` to read when loading previous obfuscation matches :vartype map_file_key: ``str`` """ name = 'Undefined Parser' regex_patterns = [] skip_line_patterns = [] parser_skip_files = [] # list of skip files relevant to a parser skip_cleaning_files = [] # list of global skip files from cmdline args map_file_key = 'unset' compile_regexes = True def __init__(self, config={}, skip_cleaning_files=[]): if self.map_file_key in config: self.mapping.conf_update(config[self.map_file_key]) self.skip_cleaning_files = skip_cleaning_files self._generate_skip_regexes() def load_map_entries(self): self.mapping.load_entries() def _generate_skip_regexes(self): """Generate the regexes for the parser's configured parser_skip_files or global skip_cleaning_files, so that we don't regenerate them on every file being examined for if the parser should skip a given file. """ self.skip_patterns = [] for p in self.parser_skip_files + self.skip_cleaning_files: self.skip_patterns.append(re.compile(p)) def generate_item_regexes(self): """Generate regexes for items the parser will be searching for repeatedly without needing to generate them for every file and/or line we process Not used by all parsers. """ if not self.compile_regexes: return for obitem in self.mapping.dataset: self.mapping.add_regex_item(obitem) def parse_line(self, line): """This will be called for every line in every file we process, so that every parser has a chance to scrub everything. This will first try to identify needed obfuscations for items we have already encountered (if the parser uses compiled regexes that is) and make those substitutions early on. After which, we will then parse the line again looking for new matches. """ count = 0 for skip_pattern in self.skip_line_patterns: if re.match(skip_pattern, line, re.I): return line, count if self.compile_regexes: line, _rcount = self._parse_line_with_compiled_regexes(line) count += _rcount line, _count = self._parse_line(line) count += _count return line, count def _parse_line_with_compiled_regexes(self, line): """Check the provided line against known items we have encountered before and have pre-generated regex Pattern() objects for. :param line: The line to parse for possible matches for obfuscation :type line: ``str`` :returns: The obfuscated line and the number of changes made :rtype: ``str``, ``int`` """ count = 0 for item, reg in self.mapping.compiled_regexes: if reg.search(line): line, _count = reg.subn(self.mapping.get(item.lower()), line) count += _count return line, count def _parse_line(self, line): """Check the provided line against the parser regex patterns to try and discover _new_ items to obfuscate :param line: The line to parse for possible matches for obfuscation :type line: ``str`` :returns: The obfsucated line, and the number of changes made :rtype: ``tuple``, ``(str, int))`` """ count = 0 for pattern in self.regex_patterns: matches = [m[0] for m in re.findall(pattern, line, re.I)] if matches: matches.sort(reverse=True, key=len) count += len(matches) for match in matches: match = match.strip() if match in self.mapping.dataset.values(): continue new_match = self.mapping.get(match) if new_match != match: line = line.replace(match, new_match) return line, count def parse_string_for_keys(self, string_data): """Parse a given string for instances of any obfuscated items, without applying the normal regex comparisons first. This is mainly used to obfuscate filenames that have, for example, hostnames in them. Rather than try to regex match the string_data, just use the builtin checks for substrings matching known obfuscated keys :param string_data: The line to be parsed :type string_data: ``str`` :returns: The obfuscated line :rtype: ``str`` """ if self.compile_regexes: for item, reg in self.mapping.compiled_regexes: if reg.search(string_data): string_data = reg.sub(self.mapping.get(item.lower()), string_data) else: for k, ob in sorted(self.mapping.dataset.items(), reverse=True, key=lambda x: len(x[0])): if k in self.mapping.skip_keys: continue if k in string_data: string_data = string_data.replace(k, ob) return string_data def get_map_contents(self): """Get the contents of the mapping used by the parser :returns: All matches and their obfuscate counterparts :rtype: ``dict`` """ return self.mapping.dataset parsers/hostname_parser.py000064400000003115151116317160011771 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import re from sos.cleaner.parsers import SoSCleanerParser from sos.cleaner.mappings.hostname_map import SoSHostnameMap class SoSHostnameParser(SoSCleanerParser): name = 'Hostname Parser' map_file_key = 'hostname_map' regex_patterns = [ r'(((\b|_)[a-zA-Z0-9-\.]{1,200}\.[a-zA-Z]{1,63}(\b|_)))' ] def __init__(self, config, workdir, skip_cleaning_files=[]): self.mapping = SoSHostnameMap(workdir) super().__init__(config, skip_cleaning_files) def parse_line(self, line): """This will be called for every line in every file we process, so that every parser has a chance to scrub everything. We are overriding parent method since we need to swap ordering of _parse_line_with_compiled_regexes and _parse_line calls. """ count = 0 for skip_pattern in self.skip_line_patterns: if re.match(skip_pattern, line, re.I): return line, count line, _count = self._parse_line(line) count += _count if self.compile_regexes: line, _rcount = self._parse_line_with_compiled_regexes(line) count += _rcount return line, count parsers/ip_parser.py000064400000003267151116317160010573 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.parsers import SoSCleanerParser from sos.cleaner.mappings.ip_map import SoSIPMap class SoSIPParser(SoSCleanerParser): """Handles parsing for IP addresses""" name = 'IP Parser' regex_patterns = [ # IPv4 with or without CIDR r'((? # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.parsers import SoSCleanerParser from sos.cleaner.mappings.ipv6_map import SoSIPv6Map class SoSIPv6Parser(SoSCleanerParser): """Parser for handling IPv6 networks and addresses""" name = 'IPv6 Parser' map_file_key = 'ipv6_map' regex_patterns = [ # Attention: note that this is a single long regex, not several entries # This is initially based off of two regexes from the Java library # for validating an IPv6 string. However, this is modified to begin and # end with a negative lookbehind to ensure that a substring of 'ed::' # is not extracted from a log message such as 'SomeFuncUsed::ADiffFunc' # that come components may log with. Further, we optionally try to grab # a trailing prefix for the network bits. r"(? # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.parsers import SoSCleanerParser from sos.cleaner.mappings.keyword_map import SoSKeywordMap class SoSKeywordParser(SoSCleanerParser): """Handles parsing for user provided keywords """ name = 'Keyword Parser' map_file_key = 'keyword_map' def __init__(self, config, workdir, skip_cleaning_files=[]): self.mapping = SoSKeywordMap(workdir) super().__init__(config, skip_cleaning_files) def _parse_line(self, line): return line, 0 parsers/mac_parser.py000064400000005416151116317160010721 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import re from sos.cleaner.parsers import SoSCleanerParser from sos.cleaner.mappings.mac_map import SoSMacMap # aa:bb:cc:fe:ff:dd:ee:ff IPV6_REG_8HEX = ( r'((? # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.parsers import SoSCleanerParser from sos.cleaner.mappings.username_map import SoSUsernameMap class SoSUsernameParser(SoSCleanerParser): """Parser for obfuscating usernames within an sos report archive. Note that this parser does not rely on regex matching directly, like most other parsers do. Instead, usernames are discovered via scraping the collected output of lastlog. As such, we do not discover new usernames later on, and only usernames present in lastlog output will be obfuscated, and those passed via the --usernames option if one is provided. """ name = 'Username Parser' map_file_key = 'username_map' regex_patterns = [] def __init__(self, config, workdir, skip_cleaning_files=[]): self.mapping = SoSUsernameMap(workdir) super().__init__(config, skip_cleaning_files) def _parse_line(self, line): return line, 0 __init__.py000064400000115166151116317160006671 0ustar00# Copyright 2020 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import hashlib import json import logging import os import shutil import fnmatch from concurrent.futures import ProcessPoolExecutor from datetime import datetime from pwd import getpwuid import sos.cleaner.preppers from sos import __version__ from sos.component import SoSComponent from sos.cleaner.parsers.ip_parser import SoSIPParser from sos.cleaner.parsers.mac_parser import SoSMacParser from sos.cleaner.parsers.hostname_parser import SoSHostnameParser from sos.cleaner.parsers.keyword_parser import SoSKeywordParser from sos.cleaner.parsers.username_parser import SoSUsernameParser from sos.cleaner.parsers.ipv6_parser import SoSIPv6Parser from sos.cleaner.archives.sos import (SoSReportArchive, SoSReportDirectory, SoSCollectorArchive, SoSCollectorDirectory) from sos.cleaner.archives.generic import DataDirArchive, TarballArchive from sos.cleaner.archives.insights import InsightsArchive from sos.utilities import (get_human_readable, import_module, ImporterHelper) # an auxiliary method to kick off child processes over its instances def obfuscate_arc_files(arc, flist): return arc.obfuscate_arc_files(flist) class SoSCleaner(SoSComponent): """ This function is designed to obfuscate potentially sensitive information from an sos report archive in a consistent and reproducible manner. It may either be invoked during the creation of a report by using the --clean option in the report command, or may be used on an already existing archive by way of 'sos clean'. The target of obfuscation are items such as IP addresses, MAC addresses, hostnames, usernames, and also keywords provided by users via the --keywords and/or --keyword-file options. For every collection made in a report the collection is parsed for such items, and when items are found SoS will generate an obfuscated replacement for it, and in all places that item is found replace the text with the obfuscated replacement mapped to it. These mappings are saved locally so that future iterations will maintain the same consistent obfuscation pairing. In the case of IP addresses, support is for IPv4 and IPv6 - effort is made to keep network topology intact so that later analysis is as accurate and easily understandable as possible. If an IP address is encountered that we cannot determine the netmask for, a private IP address from 172.17.0.0/22 range is used instead. For IPv6, note that IPv4-mapped addresses, e.g. ::ffff:10.11.12.13, are NOT supported currently, and will remain unobfuscated. For hostnames, domains are obfuscated as whole units, leaving the TLD in place. For instance, 'example.com' may be obfuscated to 'obfuscateddomain0.com' and 'foo.example.com' may end up being 'obfuscateddomain1.com'. Users will be notified of a 'mapping' file that records all items and the obfuscated counterpart mapped to them for ease of reference later on. This file should be kept private. """ desc = "Obfuscate sensitive networking information in a report" arg_defaults = { 'archive_type': 'auto', 'domains': [], 'disable_parsers': [], 'skip_cleaning_files': [], 'jobs': 4, 'keywords': [], 'keyword_file': None, 'map_file': '/etc/sos/cleaner/default_mapping', 'no_update': False, 'keep_binary_files': False, 'target': '', 'usernames': [] } def __init__(self, parser=None, args=None, cmdline=None, in_place=False, hook_commons=None): if not in_place: # we are running `sos clean` directly super().__init__(parser, args, cmdline) self.from_cmdline = True else: # we are being hooked by either SoSReport or SoSCollector, don't # re-init everything as that will cause issues, but instead load # the needed bits from the calling component self.opts = hook_commons['options'] self.tmpdir = hook_commons['tmpdir'] self.sys_tmp = hook_commons['sys_tmp'] self.policy = hook_commons['policy'] self.manifest = hook_commons['manifest'] self.from_cmdline = False # precede 'report -t' option above 'cleaner --jobs' if not hasattr(self.opts, 'jobs'): self.opts.jobs = self.opts.threads self.opts.archive_type = 'auto' self.soslog = logging.getLogger('sos') self.ui_log = logging.getLogger('sos_ui') # create the tmp subdir here to avoid a potential race condition # when obfuscating a SoSCollector run during archive extraction os.makedirs(os.path.join(self.tmpdir, 'cleaner'), exist_ok=True) self.review_parser_values() self.cleaner_mapping = self.load_map_file() os.umask(0o77) self.in_place = in_place self.hash_name = self.policy.get_preferred_hash_name() self.cleaner_md = self.manifest.components.add_section('cleaner') cleaner_dir = os.path.dirname(self.opts.map_file) \ if self.opts.map_file else '/etc/sos/cleaner' parser_args = [ self.cleaner_mapping, cleaner_dir, self.opts.skip_cleaning_files, ] self.parsers = [ SoSHostnameParser(*parser_args), SoSIPParser(*parser_args), SoSIPv6Parser(*parser_args), SoSMacParser(*parser_args), SoSKeywordParser(*parser_args), SoSUsernameParser(*parser_args), ] for _parser in self.opts.disable_parsers: for _loaded in self.parsers: _temp = _loaded.name.lower().split('parser', maxsplit=1)[0] _loaded_name = _temp.strip() if _parser.lower().strip() == _loaded_name: self.log_info(f"Disabling parser: {_loaded_name}") self.ui_log.warning( f"Disabling the '{_parser}' parser. Be aware that this" " may leave sensitive plain-text data in the archive." ) self.parsers.remove(_loaded) self.archive_types = [ SoSReportDirectory, SoSReportArchive, SoSCollectorDirectory, SoSCollectorArchive, InsightsArchive, # make sure these two are always last as they are fallbacks DataDirArchive, TarballArchive ] self.nested_archive = None self.log_info( f"Cleaner initialized. From cmdline: {self.from_cmdline}") def _fmt_log_msg(self, msg, caller=None): return f"[cleaner{f':{caller}' if caller else ''}] {msg}" def log_debug(self, msg, caller=None): self.soslog.debug(self._fmt_log_msg(msg, caller)) def log_info(self, msg, caller=None): self.soslog.info(self._fmt_log_msg(msg, caller)) def log_error(self, msg, caller=None): self.soslog.error(self._fmt_log_msg(msg, caller)) @classmethod def display_help(cls, section): section.set_title("SoS Cleaner Detailed Help") section.add_text(cls.__doc__) def load_map_file(self): """Verifies that the map file exists and has usable content. If the provided map file does not exist, or it is empty, we will print a warning and continue on with cleaning building a fresh map """ _conf = {} default_map = '/etc/sos/cleaner/default_mapping' if os.path.isdir(self.opts.map_file): raise Exception(f"Requested map file {self.opts.map_file} is a " "directory") if not os.path.exists(self.opts.map_file): if self.opts.map_file != default_map: self.log_error( f"ERROR: map file {self.opts.map_file} does not exist, " "will not load any obfuscation matches") else: with open(self.opts.map_file, 'r', encoding='utf-8') as mf: try: _conf = json.load(mf) except json.JSONDecodeError: self.log_error("ERROR: Unable to parse map file, json is " "malformed. Will not load any mappings.") except Exception as err: self.log_error("ERROR: Could not load " f"'{self.opts.map_file}': {err}") return _conf def print_disclaimer(self): """When we are directly running `sos clean`, rather than hooking into SoSCleaner via report or collect, print a disclaimer banner """ msg = self._fmt_msg("""\ This command will attempt to obfuscate information that is generally \ considered to be potentially sensitive. Such information includes IP \ addresses, MAC addresses, domain names, and any user-provided keywords. Note that this utility provides a best-effort approach to data obfuscation, \ but it does not guarantee that such obfuscation provides complete coverage of \ all such data in the archive, or that any obfuscation is provided to data that\ does not fit the description above. Users should review any resulting data and/or archives generated or processed \ by this utility for remaining sensitive content before being passed to a \ third party. """) self.ui_log.info(f"\nsos clean (version {__version__})\n") self.ui_log.info(msg) if not self.opts.batch: try: input("\nPress ENTER to continue, or CTRL-C to quit.\n") except KeyboardInterrupt: self.ui_log.info("\nExiting on user cancel") self._exit(130) except Exception as e: self._exit(1, e) @classmethod def add_parser_options(cls, parser): parser.usage = 'sos clean|mask TARGET [options]' clean_grp = parser.add_argument_group( 'Cleaner/Masking Options', 'These options control how data obfuscation is performed' ) clean_grp.add_argument('target', metavar='TARGET', help='The directory or archive to obfuscate') clean_grp.add_argument('--archive-type', default='auto', choices=['auto', 'report', 'collect', 'insights', 'data-dir', 'tarball'], help=('Specify what kind of archive the target ' 'was generated as')) clean_grp.add_argument('--domains', action='extend', default=[], help='List of domain names to obfuscate') clean_grp.add_argument('--disable-parsers', action='extend', default=[], dest='disable_parsers', help=('Disable specific parsers, so that those ' 'elements are not obfuscated')) clean_grp.add_argument('--skip-cleaning-files', '--skip-masking-files', action='extend', default=[], dest='skip_cleaning_files', help=('List of files to skip/ignore during ' 'cleaning. Globs are supported.')) clean_grp.add_argument('-j', '--jobs', default=4, type=int, help='Number of concurrent archives to clean') clean_grp.add_argument('--keywords', action='extend', default=[], dest='keywords', help='List of keywords to obfuscate') clean_grp.add_argument('--keyword-file', default=None, dest='keyword_file', help='Provide a file a keywords to obfuscate') clean_grp.add_argument('--map-file', dest='map_file', default='/etc/sos/cleaner/default_mapping', help=('Provide a previously generated mapping ' 'file for obfuscation')) clean_grp.add_argument('--no-update', dest='no_update', default=False, action='store_true', help='Do not update the --map-file with new ' 'mappings from this run') clean_grp.add_argument('--keep-binary-files', default=False, action='store_true', dest='keep_binary_files', help='Keep unprocessable binary files in the ' 'archive instead of removing them') clean_grp.add_argument('--usernames', dest='usernames', default=[], action='extend', help='List of usernames to obfuscate') def set_target_path(self, path): """For use by report and collect to set the TARGET option appropriately so that execute() can be called just as if we were running `sos clean` directly from the cmdline. """ self.opts.target = path def inspect_target_archive(self): """The target path is not a directory, so inspect it for being an archive or an archive of archives. In the event the target path is not an archive, abort. """ _arc = None if self.opts.archive_type != 'auto': check_type = self.opts.archive_type.replace('-', '_') for archive in self.archive_types: if archive.type_name == check_type: _arc = archive(self.opts.target, self.tmpdir, self.opts.keep_binary_files) else: for arc in self.archive_types: if arc.check_is_type(self.opts.target): _arc = arc(self.opts.target, self.tmpdir, self.opts.keep_binary_files) break if not _arc: return self.main_archive = _arc self.report_paths.append(_arc) if _arc.is_nested: self.report_paths.extend(_arc.get_nested_archives()) # We need to preserve the top level archive until all # nested archives are processed self.report_paths.remove(_arc) self.nested_archive = _arc if self.nested_archive: self.nested_archive.ui_name = self.nested_archive.description def review_parser_values(self): """Check any values passed to the parsers via the commandline: - For the --domains option, ensure that they are valid for the parser in question. - Convert --skip-cleaning-files from globs to regular expressions. """ for _dom in self.opts.domains: if len(_dom.split('.')) < 2: raise Exception( f"Invalid value '{_dom}' given: --domains values must be " "actual domains" ) self.opts.skip_cleaning_files = [fnmatch.translate(p) for p in self.opts.skip_cleaning_files] def execute(self): """SoSCleaner will begin by inspecting the TARGET option to determine if it is a directory, archive, or archive of archives. In the case of a directory, the default behavior will be to edit the data in place. For an archive will we unpack the archive, iterate over the contents, and then repack the archive. In the case of an archive of archives, such as one from SoSCollector, each archive will be unpacked, cleaned, and repacked and the final top-level archive will then be repacked as well. """ self.opts.target = self.opts.target.rstrip('/') self.arc_name = self.opts.target.split('/')[-1].split('.tar')[0] if self.from_cmdline: self.print_disclaimer() self.report_paths = [] if not os.path.exists(self.opts.target): self.ui_log.error("Invalid target: no such file or directory " f"{self.opts.target}") self._exit(1) self.inspect_target_archive() if not self.report_paths: self.ui_log.error("No valid archives or directories found\n") self._exit(1) # we have at least one valid target to obfuscate self.completed_reports = [] # TODO: as we separate mappings and parsers further, do this in a less # janky manner for parser in self.parsers: if parser.name == 'Hostname Parser': parser.mapping.set_initial_counts() self.preload_all_archives_into_maps() self.generate_parser_item_regexes() self.obfuscate_report_paths() if not self.completed_reports: if self.in_place: return None self.ui_log.info("No reports obfuscated, aborting...\n") self._exit(1) self.ui_log.info("\nSuccessfully obfuscated " f"{len(self.completed_reports)} report(s)\n") _map = self.compile_mapping_dict() map_path = self.write_map_for_archive(_map) self.write_map_for_config(_map) self.write_stats_to_manifest() if self.in_place: arc_paths = [a.final_archive_path for a in self.completed_reports] return map_path, arc_paths final_path = None if len(self.completed_reports) > 1: arc_path = self.rebuild_nested_archive() else: arc = self.completed_reports[0] arc_path = arc.final_archive_path checksum = self.get_new_checksum(arc.final_archive_path) if checksum is not None: chksum_name = self.obfuscate_string( f"{arc_path.split('/')[-1]}.{self.hash_name}" ) with open(os.path.join(self.sys_tmp, chksum_name), 'w', encoding='utf-8') as cf: cf.write(checksum) self.write_cleaner_log() final_path = os.path.join( self.sys_tmp, self.obfuscate_string(arc_path.split('/')[-1]) ) shutil.move(arc_path, final_path) arcstat = os.stat(final_path) # while these messages won't be included in the log file in the archive # some facilities, such as our avocado test suite, will sometimes not # capture print() output, so leverage the ui_log to print to console self.ui_log.info( f"A mapping of obfuscated elements is available at\n\t{map_path}" ) self.ui_log.info( f"\nThe obfuscated archive is available at\n\t{final_path}\n" ) self.ui_log.info(f"\tSize\t{get_human_readable(arcstat.st_size)}") self.ui_log.info(f"\tOwner\t{getpwuid(arcstat.st_uid).pw_name}\n") self.ui_log.info("Please send the obfuscated archive to your support " "representative and keep the mapping file private") self.cleanup() return None def rebuild_nested_archive(self): """Handles repacking the nested tarball, now containing only obfuscated copies of the reports, log files, manifest, etc... """ # we have an archive of archives, so repack the obfuscated tarball arc_name = self.arc_name + '-obfuscated' self.setup_archive(name=arc_name) for archive in self.completed_reports: arc_dest = archive.final_archive_path.split('/')[-1] checksum = self.get_new_checksum(archive.final_archive_path) if checksum is not None: dname = f"checksums/{arc_dest}.{self.hash_name}" self.archive.add_string(checksum, dest=dname) for dirn, _, files in os.walk(self.nested_archive.extracted_path): for filename in files: fname = os.path.join(dirn, filename) dname = fname.split(self.nested_archive.extracted_path)[-1] dname = dname.lstrip('/') self.archive.add_file(fname, dest=dname) # remove it now so we don't balloon our fs space needs os.remove(fname) self.write_cleaner_log(archive=True) return self.archive.finalize(self.opts.compression_type) def compile_mapping_dict(self): """Build a dict that contains each parser's map as a key, with the contents as that key's value. This will then be written to disk in the same directory as the obfuscated report so that sysadmins have a way to 'decode' the obfuscation locally """ _map = {} for parser in self.parsers: _map[parser.map_file_key] = {} _map[parser.map_file_key].update(parser.get_map_contents()) return _map def write_map_to_file(self, _map, path): """Write the mapping to a file on disk that is in the same location as the final archive(s). """ with open(path, 'w', encoding='utf-8') as mf: mf.write(json.dumps(_map, indent=4)) return path def write_map_for_archive(self, _map): try: map_path = os.path.join( self.sys_tmp, self.obfuscate_string(f"{self.arc_name}-private_map") ) return self.write_map_to_file(_map, map_path) except Exception as err: self.log_error(f"Could not write private map file: {err}") return None def write_map_for_config(self, _map): """Write the mapping to the config file so that subsequent runs are able to provide the same consistent mapping """ if self.opts.map_file and not self.opts.no_update: cleaner_dir = os.path.dirname(self.opts.map_file) # Attempt to create the directory /etc/sos/cleaner # just in case it didn't exist previously try: os.makedirs(cleaner_dir, exist_ok=True) self.write_map_to_file(_map, self.opts.map_file) self.log_debug(f"Wrote mapping to {self.opts.map_file}") except Exception as err: self.log_error(f"Could not update mapping config file: {err}") def write_cleaner_log(self, archive=False): """When invoked via the command line, the logging from SoSCleaner will not be added to the archive(s) it processes, so we need to write it separately to disk """ log_name = os.path.join( self.sys_tmp, f"{self.arc_name}-obfuscation.log" ) with open(log_name, 'w', encoding='utf-8') as logfile: self.sos_log_file.seek(0) for line in self.sos_log_file.readlines(): logfile.write(line) if archive: self.obfuscate_file(log_name) self.archive.add_file(log_name, dest="sos_logs/cleaner.log") def get_new_checksum(self, archive_path): """Calculate a new checksum for the obfuscated archive, as the previous checksum will no longer be valid """ try: hash_size = 1024**2 # Hash 1MiB of content at a time. with open(archive_path, 'rb') as archive_fp: digest = hashlib.new(self.hash_name) while True: hashdata = archive_fp.read(hash_size) if not hashdata: break digest.update(hashdata) return digest.hexdigest() + '\n' except Exception as err: self.log_debug(f"Could not generate new checksum: {err}") return None def obfuscate_report_paths(self): """Perform the obfuscation for each archive or sos directory discovered during setup. Each archive is handled in a separate thread, up to self.opts.jobs will be obfuscated concurrently. """ try: msg = ( f"Found {len(self.report_paths)} total reports to obfuscate, " f"processing up to {self.opts.jobs} concurrently within one " "archive\n" ) self.ui_log.info(msg) if self.opts.keep_binary_files: self.ui_log.warning( "WARNING: binary files that potentially contain sensitive " "information will NOT be removed from the final archive\n" ) for report_path in self.report_paths: self.ui_log.info(f"Obfuscating {report_path.archive_path}") self.obfuscate_report(report_path) # finally, obfuscate the nested archive if one exists if self.nested_archive: self._replace_obfuscated_archives() self.obfuscate_report(self.nested_archive) except KeyboardInterrupt: self.ui_log.info("Exiting on user cancel") os._exit(130) def _replace_obfuscated_archives(self): """When we have a nested archive, we need to rebuild the original archive, which entails replacing the existing archives with their obfuscated counterparts """ for archive in self.completed_reports: os.remove(archive.archive_path) dest = self.nested_archive.extracted_path tarball = archive.final_archive_path.split('/')[-1] dest_name = os.path.join(dest, tarball) shutil.move(archive.final_archive_path, dest) archive.final_archive_path = dest_name def generate_parser_item_regexes(self): """For the parsers that use prebuilt lists of items, generate those regexes now since all the parsers should be preloaded by the archive(s) as well as being handed cmdline options and mapping file configuration. """ for parser in self.parsers: parser.generate_item_regexes() def _prepare_archive_with_prepper(self, archive, prepper): """ For each archive we've determined we need to operate on, pass it to each prepper so that we can extract necessary files and/or items for direct regex replacement. Preppers define these methods per parser, so it is possible that a single prepper will read the same file for different parsers/mappings. This is preferable to the alternative of building up monolithic lists of file paths, as we'd still need to manipulate these on a per-archive basis. :param archive: The archive we are currently using to prepare our mappings with :type archive: ``SoSObfuscationArchive`` subclass :param prepper: The individual prepper we're using to source items :type prepper: ``SoSPrepper`` subclass """ for _parser in self.parsers: pname = _parser.name.lower().split()[0].strip() for _file in prepper.get_parser_file_list(pname, archive): content = archive.get_file_content(_file) if not content: continue self.log_debug(f"Prepping {pname} parser with file {_file} " f"from {archive.ui_name}") for line in content.splitlines(): try: _parser.parse_line(line) except Exception as err: self.log_debug( f"Failed to prep {pname} map from {_file}: {err}" ) map_items = prepper.get_items_for_map(pname, archive) if map_items: self.log_debug(f"Prepping {pname} mapping with items from " f"{archive.ui_name}") for item in map_items: _parser.mapping.add(item) for ritem in prepper.regex_items[pname]: _parser.mapping.add_regex_item(ritem) # we must initialize stuff inside (cloned processes') archive - REALLY? archive.set_parsers(self.parsers) def get_preppers(self): """ Discover all locally available preppers so that we can prepare the mappings with obfuscation matches in a controlled manner :returns: All preppers that can be leveraged locally :rtype: A generator of `SoSPrepper` items """ helper = ImporterHelper(sos.cleaner.preppers) preps = [] for _prep in helper.get_modules(): preps.extend(import_module(f"sos.cleaner.preppers.{_prep}")) for prepper in sorted(preps, key=lambda x: x.priority): yield prepper(options=self.opts) def preload_all_archives_into_maps(self): """Before doing the actual obfuscation, if we have multiple archives to obfuscate then we need to preload each of them into the mappings to ensure that node1 is obfuscated in node2 as well as node2 being obfuscated in node1's archive. """ self.log_info("Pre-loading all archives into obfuscation maps") for prepper in self.get_preppers(): for archive in self.report_paths: self._prepare_archive_with_prepper(archive, prepper) self.main_archive.set_parsers(self.parsers) def obfuscate_report(self, archive): # pylint: disable=too-many-branches """Individually handle each archive or directory we've discovered by running through each file therein. Positional arguments: :param archive str: Filepath to the directory or archive """ try: arc_md = self.cleaner_md.add_section(archive.archive_name) start_time = datetime.now() arc_md.add_field('start_time', start_time) # don't double extract nested archives if not archive.is_extracted: archive.extract() archive.report_msg("Beginning obfuscation...") file_list = list(archive.get_files()) # we can't call simple # executor.map(archive.obfuscate_arc_files,archive.get_files()) # because a child process does not carry forward internal changes # (e.g. mappings' datasets) from one call of obfuscate_arc_files # method to another. Each obfuscate_arc_files method starts with # vanilla parent archive, that is initialised *once* at its # beginning via initializer=archive.load_parser_entries # - but not afterwards.. # # So we must pass list of all files for each worker at the # beginning. This means less granularity of the child processes # work (one worker can finish much sooner than the other), but # it is the best we can have (or have found) # # At least, the "file_list[i::self.opts.jobs]" means subsequent # files (speculativelly of similar size and content) are # distributed to different processes, which attempts to split the # load evenly. Yet better approach might be reorderig file_list # based on files' sizes. files_obfuscated_count = total_sub_count = removed_file_count = 0 archive_list = [archive for i in range(self.opts.jobs)] with ProcessPoolExecutor( max_workers=self.opts.jobs, initializer=archive.load_parser_entries) as executor: futures = executor.map(obfuscate_arc_files, archive_list, [file_list[i::self.opts.jobs] for i in range(self.opts.jobs)]) for (foc, tsc, rfc) in futures: files_obfuscated_count += foc total_sub_count += tsc removed_file_count += rfc # As there is no easy way to get dataset dicts from child # processes' mappings, we can reload our own parent-process # archive from the disk files. The trick is that sequence of # files/entries is the source of truth of *sequence* of calling # *all* mapping.all(item) methods - so replaying this will # generate the right datasets! archive.load_parser_entries() try: self.obfuscate_directory_names(archive) except Exception as err: self.log_info(f"Failed to obfuscate directories: {err}", caller=archive.archive_name) try: self.obfuscate_symlinks(archive) except Exception as err: self.log_info(f"Failed to obfuscate symlinks: {err}", caller=archive.archive_name) # if the archive was already a tarball, repack it if not archive.is_nested: method = archive.get_compression() if method: archive.report_msg("Re-compressing...") try: archive.rename_top_dir( self.obfuscate_string(archive.archive_name) ) archive.compress(method) except Exception as err: self.log_debug(f"Archive {archive.archive_name} failed" f" to compress: {err}") archive.report_msg( f"Failed to re-compress archive: {err}") return self.completed_reports.append(archive) end_time = datetime.now() arc_md.add_field('end_time', end_time) arc_md.add_field('run_time', end_time - start_time) arc_md.add_field('files_obfuscated', files_obfuscated_count) arc_md.add_field('total_substitutions', total_sub_count) rmsg = '' if removed_file_count: rmsg = " [removed %s unprocessable files]" rmsg = rmsg % removed_file_count archive.report_msg(f"Obfuscation completed{rmsg}") except Exception as err: self.ui_log.info("Exception while processing " f"{archive.archive_name}: {err}") def obfuscate_file(self, filename): self.main_archive.obfuscate_arc_files([filename]) def obfuscate_symlinks(self, archive): """Iterate over symlinks in the archive and obfuscate their names. The content of the link target will have already been cleaned, and this second pass over just the names of the links is to ensure we avoid a possible race condition dependent on the order in which the link or the target get obfuscated. :param archive: The archive being obfuscated :type archive: ``SoSObfuscationArchive`` """ self.log_info("Obfuscating symlink names", caller=archive.archive_name) for symlink in archive.get_symlinks(): try: # relative name of the symlink in the archive _sym = symlink.split(archive.extracted_path)[1].lstrip('/') # don't obfuscate symlinks for files that we skipped the first # obfuscation of, as that would create broken links _parsers = [ _p for _p in self.parsers if not any(_skip.match(_sym) for _skip in _p.skip_patterns) ] if not _parsers: self.log_debug( f"Skipping obfuscation of symlink {_sym} due to skip " f"pattern match" ) continue self.log_debug(f"Obfuscating symlink {_sym}", caller=archive.archive_name) # current target of symlink, again relative to the archive _target = os.readlink(symlink) # get the potentially renamed symlink name, this time the full # path as it exists on disk _ob_sym_name = os.path.join(archive.extracted_path, self.obfuscate_string(_sym)) # get the potentially renamed relative target filename _ob_target = self.obfuscate_string(_target) # if either the symlink name or the target name has changed, # recreate the symlink if (_ob_sym_name != symlink) or (_ob_target != _target): os.remove(symlink) os.symlink(_ob_target, _ob_sym_name) except Exception as err: self.log_info(f"Error obfuscating symlink '{symlink}': {err}") def obfuscate_directory_names(self, archive): """For all directories that exist within the archive, obfuscate the directory name if it contains sensitive strings found during execution """ self.log_info("Obfuscating directory names in archive " f"{archive.archive_name}") for dirpath in sorted(archive.get_directory_list(), reverse=True): for _name in os.listdir(dirpath): _dirname = os.path.join(dirpath, _name) _arc_dir = _dirname.split(archive.extracted_path)[-1] if os.path.isdir(_dirname): _ob_dirname = self.obfuscate_string(_name) if _ob_dirname != _name: _ob_arc_dir = _arc_dir.rstrip(_name) _ob_arc_dir = os.path.join( archive.extracted_path, _ob_arc_dir.lstrip('/'), _ob_dirname ) os.rename(_dirname, _ob_arc_dir) # TODO: this is a duplicate method from SoSObfuscationArchive but we can't # easily remove either of them..? def obfuscate_string(self, string_data): for parser in self.parsers: try: string_data = parser.parse_string_for_keys(string_data) except Exception as err: self.log_info(f"Error obfuscating string data: {err}") return string_data def write_stats_to_manifest(self): """Write some cleaner-level, non-report-specific stats to the manifest """ parse_sec = self.cleaner_md.add_section('parsers') for parser in self.parsers: _sec = parse_sec.add_section(parser.name.replace(' ', '_').lower()) _sec.add_field('entries', len(parser.mapping.dataset.keys())) # vim: set et ts=4 sw=4 : preppers/__pycache__/__init__.cpython-39.opt-1.pyc000064400000011771151116317160015754 0ustar00a \hW@sddlZGdddZdS)Nc@sPeZdZdZdZdZddZddZdd Zd d Z d d Z ddZ ddZ dS) SoSPreppera A prepper is a way to prepare loaded mappings with selected items within an sos report prior to beginning the full obfuscation routine. This was previously handled directly within archives, however this is a bit cumbersome and doesn't allow for all the flexibility we could use in this effort. Preppers are separated from parsers but will leverage them in order to feed parser-matched strings from files highlighted by a Prepper() to the appropriate mapping for initial obfuscation. Preppers may specify their own priority in order to influence the order in which mappings are prepped. Further, Preppers have two ways to prepare the maps - either by generating a list of filenames or via directly pulling content out of select files without the assistance of a parser. A lower priority value means the prepper should run sooner than those with higher values. For the former approach, `Prepper._get_$parser_file_list()` should be used and should yield filenames that exist in target archives. For the latter, the `Prepper._get_items_for_$map()` should be used. Finally, a `regex_items` dict is available for storing individual regex items for parsers that rely on them. These items will be added after all files and other individual items are handled. This dict has keys set to parser/mapping names, and the values should be sets of items, so preppers should add to them like so: self.regex_items['hostname'].add('myhostname') Z UndefineddcCsBttttttd|_||_td|_td|_dS)N)hostnameipZipv6keywordZmacusernameZsosZsos_ui)setZ regex_itemsZoptsloggingZ getLoggersoslogZui_log)selfoptionsr A/usr/lib/python3.9/site-packages/sos/cleaner/preppers/__init__.py__init__2s zSoSPrepper.__init__cCsd|jd|S)Nz [prepper:z] )namer msgr r r _fmt_log_msg?szSoSPrepper._fmt_log_msgcCs|j||dSN)r debugrrr r r log_debugBszSoSPrepper.log_debugcCs|j||dSr)r inforrr r rlog_infoEszSoSPrepper.log_infocCs|j||dSr)r errorrrr r r log_errorHszSoSPrepper.log_errorcCs(d|d}t||r$t|||SgS)a Helper that calls the appropriate Prepper method for the specified parser. This allows Preppers to be able to provide items for multiple types of parsers without needing to handle repetitious logic to determine which parser we're interested within each individual call. The convention to use is to define `_get_$parser_file_list()` methods within Preppers, e.g. `_get_hostname_file_list()` would be used to provide filenames for the hostname parser. If such a method is not defined within a Prepper for a given parser, we handle that here so that individual Preppers do not need to. :param parser: The _name_ of the parser to get a file list for :type parser: ``str`` :param archive: The archive we are operating on currently for the specified parser :type archive: ``SoSObfuscationArchive`` :returns: A list of filenames within the archive to prep with :rtype: ``list`` Z_get_Z _file_listhasattrgetattr)r parserarchive_checkr r rget_parser_file_listKs  zSoSPrepper.get_parser_file_listcCs&d|}t||r"t|||SgS)a Similar to `get_parser_file_list()`, a helper for calling the specific method for generating items for the given `map`. This allows Preppers to be able to provide items for multiple types of maps, without the need to handle repetitious logic to determine which parser we're interested in within each individual call. :param mapping: The _name_ of the mapping to get items for :type mapping: ``str`` :param archive: The archive we are operating on currently for the specified parser :type archive: ``SoSObfuscationArchive`` :returns: A list of distinct items to obfuscate without using a parser :rtype: ``list`` Z_get_items_for_r)r mappingrr r r rget_items_for_mapgs  zSoSPrepper.get_items_for_mapN) __name__ __module__ __qualname____doc__rpriorityrrrrrr!r#r r r rrs  r)r rr r r r spreppers/__pycache__/__init__.cpython-39.pyc000064400000011771151116317160015015 0ustar00a \hW@sddlZGdddZdS)Nc@sPeZdZdZdZdZddZddZdd Zd d Z d d Z ddZ ddZ dS) SoSPreppera A prepper is a way to prepare loaded mappings with selected items within an sos report prior to beginning the full obfuscation routine. This was previously handled directly within archives, however this is a bit cumbersome and doesn't allow for all the flexibility we could use in this effort. Preppers are separated from parsers but will leverage them in order to feed parser-matched strings from files highlighted by a Prepper() to the appropriate mapping for initial obfuscation. Preppers may specify their own priority in order to influence the order in which mappings are prepped. Further, Preppers have two ways to prepare the maps - either by generating a list of filenames or via directly pulling content out of select files without the assistance of a parser. A lower priority value means the prepper should run sooner than those with higher values. For the former approach, `Prepper._get_$parser_file_list()` should be used and should yield filenames that exist in target archives. For the latter, the `Prepper._get_items_for_$map()` should be used. Finally, a `regex_items` dict is available for storing individual regex items for parsers that rely on them. These items will be added after all files and other individual items are handled. This dict has keys set to parser/mapping names, and the values should be sets of items, so preppers should add to them like so: self.regex_items['hostname'].add('myhostname') Z UndefineddcCsBttttttd|_||_td|_td|_dS)N)hostnameipZipv6keywordZmacusernameZsosZsos_ui)setZ regex_itemsZoptsloggingZ getLoggersoslogZui_log)selfoptionsr A/usr/lib/python3.9/site-packages/sos/cleaner/preppers/__init__.py__init__2s zSoSPrepper.__init__cCsd|jd|S)Nz [prepper:z] )namer msgr r r _fmt_log_msg?szSoSPrepper._fmt_log_msgcCs|j||dSN)r debugrrr r r log_debugBszSoSPrepper.log_debugcCs|j||dSr)r inforrr r rlog_infoEszSoSPrepper.log_infocCs|j||dSr)r errorrrr r r log_errorHszSoSPrepper.log_errorcCs(d|d}t||r$t|||SgS)a Helper that calls the appropriate Prepper method for the specified parser. This allows Preppers to be able to provide items for multiple types of parsers without needing to handle repetitious logic to determine which parser we're interested within each individual call. The convention to use is to define `_get_$parser_file_list()` methods within Preppers, e.g. `_get_hostname_file_list()` would be used to provide filenames for the hostname parser. If such a method is not defined within a Prepper for a given parser, we handle that here so that individual Preppers do not need to. :param parser: The _name_ of the parser to get a file list for :type parser: ``str`` :param archive: The archive we are operating on currently for the specified parser :type archive: ``SoSObfuscationArchive`` :returns: A list of filenames within the archive to prep with :rtype: ``list`` Z_get_Z _file_listhasattrgetattr)r parserarchive_checkr r rget_parser_file_listKs  zSoSPrepper.get_parser_file_listcCs&d|}t||r"t|||SgS)a Similar to `get_parser_file_list()`, a helper for calling the specific method for generating items for the given `map`. This allows Preppers to be able to provide items for multiple types of maps, without the need to handle repetitious logic to determine which parser we're interested in within each individual call. :param mapping: The _name_ of the mapping to get items for :type mapping: ``str`` :param archive: The archive we are operating on currently for the specified parser :type archive: ``SoSObfuscationArchive`` :returns: A list of distinct items to obfuscate without using a parser :rtype: ``list`` Z_get_items_for_r)r mappingrr r r rget_items_for_mapgs  zSoSPrepper.get_items_for_mapN) __name__ __module__ __qualname____doc__rpriorityrrrrrr!r#r r r rrs  r)r rr r r r spreppers/__pycache__/hostname.cpython-39.opt-1.pyc000064400000003152151116317160016025 0ustar00a \h@s ddlmZGdddeZdS)) SoSPrepperc@seZdZdZdZddZdS)HostnamePrepperaw Prepper for providing domain and hostname information to the hostname mapping. The items from hostname sources are handled manually via the _get_items method, rather than passing the file directly, as the parser does not know what hostnames or domains to match on initially. This will also populate the regex_items list with local short names. hostnamec Cs:g}d}|jrd}n |jrd}||}|r|dkr|d}t|dkrl||d|jd|dt|dkrd|d d}|| || |d }| D]b}| d sd|vrq|dd} | D]4} t| ddkr|jd| q|| qq|j j D]} || q$|S) Nrzsos_commands/host/hostname_-fz"data/insights_commands/hostname_-fZ localhost.rz etc/hosts#)Zis_sosZ is_insightsZget_file_contentsplitlenappendZ regex_itemsaddjoinstrip splitlines startswithZoptsdomains) selfarchiveitemsZ_fileZcontentrZ top_domainZ_hostslineZhostlnhostdomainrA/usr/lib/python3.9/site-packages/sos/cleaner/preppers/hostname.py_get_items_for_hostnames8        z'HostnamePrepper._get_items_for_hostnameN)__name__ __module__ __qualname____doc__namerrrrrrs rN)Zsos.cleaner.preppersrrrrrr s preppers/__pycache__/hostname.cpython-39.pyc000064400000003152151116317160015066 0ustar00a \h@s ddlmZGdddeZdS)) SoSPrepperc@seZdZdZdZddZdS)HostnamePrepperaw Prepper for providing domain and hostname information to the hostname mapping. The items from hostname sources are handled manually via the _get_items method, rather than passing the file directly, as the parser does not know what hostnames or domains to match on initially. This will also populate the regex_items list with local short names. hostnamec Cs:g}d}|jrd}n |jrd}||}|r|dkr|d}t|dkrl||d|jd|dt|dkrd|d d}|| || |d }| D]b}| d sd|vrq|dd} | D]4} t| ddkr|jd| q|| qq|j j D]} || q$|S) Nrzsos_commands/host/hostname_-fz"data/insights_commands/hostname_-fZ localhost.rz etc/hosts#)Zis_sosZ is_insightsZget_file_contentsplitlenappendZ regex_itemsaddjoinstrip splitlines startswithZoptsdomains) selfarchiveitemsZ_fileZcontentrZ top_domainZ_hostslineZhostlnhostdomainrA/usr/lib/python3.9/site-packages/sos/cleaner/preppers/hostname.py_get_items_for_hostnames8        z'HostnamePrepper._get_items_for_hostnameN)__name__ __module__ __qualname____doc__namerrrrrrs rN)Zsos.cleaner.preppersrrrrrr s preppers/__pycache__/ip.cpython-39.opt-1.pyc000064400000001726151116317160014624 0ustar00a \h@s ddlmZGdddeZdS)) SoSPrepperc@s$eZdZdZdZddZddZdS) IPPrepperz This prepper is for IP network addresses. The aim of this prepper is to provide the file path for where the output of `ip addr` is saved. ipcCs ||S)N)_get_ip_file_list)selfarchiver;/usr/lib/python3.9/site-packages/sos/cleaner/preppers/ip.py_get_ipv6_file_listszIPPrepper._get_ipv6_file_listcCs"g}|jrdg}n |jrdg}|S)Nz"sos_commands/networking/ip_-o_addrzdata/insights_commands/ip_addr)Zis_sosZ is_insights)rr_filesrrr rs zIPPrepper._get_ip_file_listN)__name__ __module__ __qualname____doc__namer rrrrr rsrN)Zsos.cleaner.preppersrrrrrr  s preppers/__pycache__/ip.cpython-39.pyc000064400000001726151116317160013665 0ustar00a \h@s ddlmZGdddeZdS)) SoSPrepperc@s$eZdZdZdZddZddZdS) IPPrepperz This prepper is for IP network addresses. The aim of this prepper is to provide the file path for where the output of `ip addr` is saved. ipcCs ||S)N)_get_ip_file_list)selfarchiver;/usr/lib/python3.9/site-packages/sos/cleaner/preppers/ip.py_get_ipv6_file_listszIPPrepper._get_ipv6_file_listcCs"g}|jrdg}n |jrdg}|S)Nz"sos_commands/networking/ip_-o_addrzdata/insights_commands/ip_addr)Zis_sosZ is_insights)rr_filesrrr rs zIPPrepper._get_ip_file_listN)__name__ __module__ __qualname____doc__namer rrrrr rsrN)Zsos.cleaner.preppersrrrrrr  s preppers/__pycache__/keywords.cpython-39.opt-1.pyc000064400000001771151116317160016063 0ustar00a \h@s(ddlZddlmZGdddeZdS)N) SoSPrepperc@seZdZdZdZddZdS)KeywordPrepperzs Prepper to handle keywords passed to cleaner via either the `--keywords` or `--keyword-file` options. keywordcCsg}|jjD]}||q |jjrxtj|jjrxt|jjddd"}|| Wdn1sn0Y|D]}|j d |q||S)Nrzutf-8)encodingr) ZoptskeywordsappendZ keyword_fileospathexistsopenextendread splitlinesZ regex_itemsadd)selfarchiveitemskwZkwfitemrA/usr/lib/python3.9/site-packages/sos/cleaner/preppers/keywords.py_get_items_for_keywords  0z%KeywordPrepper._get_items_for_keywordN)__name__ __module__ __qualname____doc__namerrrrrrsr)r Zsos.cleaner.preppersrrrrrr s preppers/__pycache__/keywords.cpython-39.pyc000064400000001771151116317160015124 0ustar00a \h@s(ddlZddlmZGdddeZdS)N) SoSPrepperc@seZdZdZdZddZdS)KeywordPrepperzs Prepper to handle keywords passed to cleaner via either the `--keywords` or `--keyword-file` options. keywordcCsg}|jjD]}||q |jjrxtj|jjrxt|jjddd"}|| Wdn1sn0Y|D]}|j d |q||S)Nrzutf-8)encodingr) ZoptskeywordsappendZ keyword_fileospathexistsopenextendread splitlinesZ regex_itemsadd)selfarchiveitemskwZkwfitemrA/usr/lib/python3.9/site-packages/sos/cleaner/preppers/keywords.py_get_items_for_keywords  0z%KeywordPrepper._get_items_for_keywordN)__name__ __module__ __qualname____doc__namerrrrrrsr)r Zsos.cleaner.preppersrrrrrr s preppers/__pycache__/mac.cpython-39.opt-1.pyc000064400000001360151116317160014746 0ustar00a \ha@s ddlmZGdddeZdS)) SoSPrepperc@seZdZdZdZddZdS) MacPrepperzS Prepper for sourcing the host's MAC address in order to prep the mapping. ZmaccCs|jr dgS|jrdgSgS)Nz%sos_commands/networking/ip_-d_addresszdata/insights_commands/ip_addr)Zis_sosZ is_insights)selfarchiver s preppers/__pycache__/mac.cpython-39.pyc000064400000001360151116317160014007 0ustar00a \ha@s ddlmZGdddeZdS)) SoSPrepperc@seZdZdZdZddZdS) MacPrepperzS Prepper for sourcing the host's MAC address in order to prep the mapping. ZmaccCs|jr dgS|jrdgSgS)Nz%sos_commands/networking/ip_-d_addresszdata/insights_commands/ip_addr)Zis_sosZ is_insights)selfarchiver s preppers/__pycache__/usernames.cpython-39.opt-1.pyc000064400000003074151116317160016214 0ustar00a \h @s ddlmZGdddeZdS)) SoSPrepperc@s$eZdZdZdZgdZddZdS)UsernamePrepperz This prepper is used to source usernames from various `last` output content as well as a couple select files. This prepper will also leverage the --usernames option. username) ZcoreZnobodyZ nfsnobodyZshutdownstackZrebootrootztimeout:ZubunturZwtmpc Cst}gd}|D]}||}|s&q|D]}zz|d}d|vrvt|ddkrr|d}nWq.|r||jvr||d|vr||ddWq.tyYq.0q.q|j j D]}||jvr||q|S)N) z(sos_commands/login/lastlog_-u_1000-60000z)sos_commands/login/lastlog_-u_60001-65536z.sos_commands/login/lastlog_-u_65537-4294967295zsos_commands/login/lastlog2zsos_commands/login/lastzsos_commands/login/last_-Fzsos_commands/login/lsloginszetc/cron.allowz etc/cron.denyrZlsloginsi\) setZget_file_content splitlinessplitlowerint skip_listadd ExceptionZoptsZ usernames) selfarchiveitems_filesZ_fileZcontentlineuserZopt_userrB/usr/lib/python3.9/site-packages/sos/cleaner/preppers/usernames.py_get_items_for_username%s.         z'UsernamePrepper._get_items_for_usernameN)__name__ __module__ __qualname____doc__namerrrrrrrsrN)Zsos.cleaner.preppersrrrrrr s preppers/__pycache__/usernames.cpython-39.pyc000064400000003074151116317160015255 0ustar00a \h @s ddlmZGdddeZdS)) SoSPrepperc@s$eZdZdZdZgdZddZdS)UsernamePrepperz This prepper is used to source usernames from various `last` output content as well as a couple select files. This prepper will also leverage the --usernames option. username) ZcoreZnobodyZ nfsnobodyZshutdownstackZrebootrootztimeout:ZubunturZwtmpc Cst}gd}|D]}||}|s&q|D]}zz|d}d|vrvt|ddkrr|d}nWq.|r||jvr||d|vr||ddWq.tyYq.0q.q|j j D]}||jvr||q|S)N) z(sos_commands/login/lastlog_-u_1000-60000z)sos_commands/login/lastlog_-u_60001-65536z.sos_commands/login/lastlog_-u_65537-4294967295zsos_commands/login/lastlog2zsos_commands/login/lastzsos_commands/login/last_-Fzsos_commands/login/lsloginszetc/cron.allowz etc/cron.denyrZlsloginsi\) setZget_file_content splitlinessplitlowerint skip_listadd ExceptionZoptsZ usernames) selfarchiveitems_filesZ_fileZcontentlineuserZopt_userrB/usr/lib/python3.9/site-packages/sos/cleaner/preppers/usernames.py_get_items_for_username%s.         z'UsernamePrepper._get_items_for_usernameN)__name__ __module__ __qualname____doc__namerrrrrrrsrN)Zsos.cleaner.preppersrrrrrr s preppers/__init__.py000064400000011527151116317160010525 0ustar00# Copyright 2023 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import logging class SoSPrepper(): """ A prepper is a way to prepare loaded mappings with selected items within an sos report prior to beginning the full obfuscation routine. This was previously handled directly within archives, however this is a bit cumbersome and doesn't allow for all the flexibility we could use in this effort. Preppers are separated from parsers but will leverage them in order to feed parser-matched strings from files highlighted by a Prepper() to the appropriate mapping for initial obfuscation. Preppers may specify their own priority in order to influence the order in which mappings are prepped. Further, Preppers have two ways to prepare the maps - either by generating a list of filenames or via directly pulling content out of select files without the assistance of a parser. A lower priority value means the prepper should run sooner than those with higher values. For the former approach, `Prepper._get_$parser_file_list()` should be used and should yield filenames that exist in target archives. For the latter, the `Prepper._get_items_for_$map()` should be used. Finally, a `regex_items` dict is available for storing individual regex items for parsers that rely on them. These items will be added after all files and other individual items are handled. This dict has keys set to parser/mapping names, and the values should be sets of items, so preppers should add to them like so: self.regex_items['hostname'].add('myhostname') """ name = 'Undefined' priority = 100 def __init__(self, options): self.regex_items = { 'hostname': set(), 'ip': set(), 'ipv6': set(), 'keyword': set(), 'mac': set(), 'username': set() } self.opts = options self.soslog = logging.getLogger('sos') self.ui_log = logging.getLogger('sos_ui') def _fmt_log_msg(self, msg): return f"[prepper:{self.name}] {msg}" def log_debug(self, msg): self.soslog.debug(self._fmt_log_msg(msg)) def log_info(self, msg): self.soslog.info(self._fmt_log_msg(msg)) def log_error(self, msg): self.soslog.error(self._fmt_log_msg(msg)) def get_parser_file_list(self, parser, archive): """ Helper that calls the appropriate Prepper method for the specified parser. This allows Preppers to be able to provide items for multiple types of parsers without needing to handle repetitious logic to determine which parser we're interested within each individual call. The convention to use is to define `_get_$parser_file_list()` methods within Preppers, e.g. `_get_hostname_file_list()` would be used to provide filenames for the hostname parser. If such a method is not defined within a Prepper for a given parser, we handle that here so that individual Preppers do not need to. :param parser: The _name_ of the parser to get a file list for :type parser: ``str`` :param archive: The archive we are operating on currently for the specified parser :type archive: ``SoSObfuscationArchive`` :returns: A list of filenames within the archive to prep with :rtype: ``list`` """ _check = f"_get_{parser}_file_list" if hasattr(self, _check): return getattr(self, _check)(archive) return [] def get_items_for_map(self, mapping, archive): """ Similar to `get_parser_file_list()`, a helper for calling the specific method for generating items for the given `map`. This allows Preppers to be able to provide items for multiple types of maps, without the need to handle repetitious logic to determine which parser we're interested in within each individual call. :param mapping: The _name_ of the mapping to get items for :type mapping: ``str`` :param archive: The archive we are operating on currently for the specified parser :type archive: ``SoSObfuscationArchive`` :returns: A list of distinct items to obfuscate without using a parser :rtype: ``list`` """ _check = f"_get_items_for_{mapping}" if hasattr(self, _check): return getattr(self, _check)(archive) return [] # vim: set et ts=4 sw=4 : preppers/hostname.py000064400000004330151116317160010576 0ustar00# Copyright 2023 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.preppers import SoSPrepper class HostnamePrepper(SoSPrepper): """ Prepper for providing domain and hostname information to the hostname mapping. The items from hostname sources are handled manually via the _get_items method, rather than passing the file directly, as the parser does not know what hostnames or domains to match on initially. This will also populate the regex_items list with local short names. """ name = 'hostname' def _get_items_for_hostname(self, archive): items = [] _file = 'hostname' if archive.is_sos: _file = 'sos_commands/host/hostname_-f' elif archive.is_insights: _file = 'data/insights_commands/hostname_-f' content = archive.get_file_content(_file) if content and content != 'localhost': domains = content.split('.') if len(domains) > 1: items.append(domains[0]) self.regex_items['hostname'].add((domains[0])) if len(domains) > 3: # make sure we get example.com if the system's hostname # is something like foo.bar.example.com top_domain = '.'.join(domains[-2:]) items.append(top_domain.strip()) items.append(content.strip()) _hosts = archive.get_file_content('etc/hosts') for line in _hosts.splitlines(): if line.startswith('#') or 'localhost' in line: continue hostln = line.split()[1:] for host in hostln: if len(host.split('.')) == 1: self.regex_items['hostname'].add(host) else: items.append(host) for domain in self.opts.domains: items.append(domain) return items preppers/ip.py000064400000002032151116317160007365 0ustar00# Copyright 2023 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.preppers import SoSPrepper class IPPrepper(SoSPrepper): """ This prepper is for IP network addresses. The aim of this prepper is to provide the file path for where the output of `ip addr` is saved. """ name = 'ip' def _get_ipv6_file_list(self, archive): return self._get_ip_file_list(archive) def _get_ip_file_list(self, archive): _files = [] if archive.is_sos: _files = ['sos_commands/networking/ip_-o_addr'] elif archive.is_insights: _files = ['data/insights_commands/ip_addr'] return _files # vim: set et ts=4 sw=4 : preppers/keywords.py000064400000002211151116317160010623 0ustar00# Copyright 2023 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import os from sos.cleaner.preppers import SoSPrepper class KeywordPrepper(SoSPrepper): """ Prepper to handle keywords passed to cleaner via either the `--keywords` or `--keyword-file` options. """ name = 'keyword' # pylint: disable=unused-argument def _get_items_for_keyword(self, archive): items = [] for kw in self.opts.keywords: items.append(kw) if self.opts.keyword_file and os.path.exists(self.opts.keyword_file): with open(self.opts.keyword_file, 'r', encoding='utf-8') as kwf: items.extend(kwf.read().splitlines()) for item in items: self.regex_items['keyword'].add(item) return items # vim: set et ts=4 sw=4 : preppers/mac.py000064400000001541151116317160007521 0ustar00# Copyright 2023 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.preppers import SoSPrepper class MacPrepper(SoSPrepper): """ Prepper for sourcing the host's MAC address in order to prep the mapping. """ name = 'mac' def _get_mac_file_list(self, archive): if archive.is_sos: return ['sos_commands/networking/ip_-d_address'] if archive.is_insights: return ['data/insights_commands/ip_addr'] return [] # vim: set et ts=4 sw=4 : preppers/usernames.py000064400000005020151116317160010757 0ustar00# Copyright 2023 Red Hat, Inc. Jake Hunsaker # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. from sos.cleaner.preppers import SoSPrepper class UsernamePrepper(SoSPrepper): """ This prepper is used to source usernames from various `last` output content as well as a couple select files. This prepper will also leverage the --usernames option. """ name = 'username' skip_list = [ 'core', 'nobody', 'nfsnobody', 'shutdown', 'stack', 'reboot', 'root', 'timeout:', 'ubuntu', 'username', 'wtmp' ] def _get_items_for_username(self, archive): items = set() _files = [ 'sos_commands/login/lastlog_-u_1000-60000', 'sos_commands/login/lastlog_-u_60001-65536', 'sos_commands/login/lastlog_-u_65537-4294967295', 'sos_commands/login/lastlog2', # AD users will be reported here, but favor the lastlog files since # those will include local users who have not logged in 'sos_commands/login/last', 'sos_commands/login/last_-F', 'sos_commands/login/lslogins', 'etc/cron.allow', 'etc/cron.deny' ] for _file in _files: content = archive.get_file_content(_file) if not content: continue for line in content.splitlines(): try: user = line.split()[0].lower() if "lslogins" in _file: if int(line.split()[0]) >= 1000: user = line.split()[1].lower() else: continue if user and user not in self.skip_list: items.add(user) if '\\' in user: items.add(user.split('\\')[-1]) except Exception: # empty line or otherwise unusable for name sourcing pass for opt_user in self.opts.usernames: if opt_user not in self.skip_list: items.add(opt_user) return items # vim: set et ts=4 sw=4 :