具有并行统计模型的 Python 脚本在本地运行时中断

Python script with parallel statistical model breaks when run locally

提问人:Hack-R 提问时间:2/16/2016 最后编辑:Hack-R 更新时间:2/16/2016 访问量:415

问:

我有一个在 Kaggle.com 服务器上开发的 Python 脚本。我想在本地运行它,但它会产生错误,并且在我尝试时永远不会完成。

我相信这与其中一种统计算法的并行性质有关,但我无法弄清楚我需要什么才能在我的本地计算机上正确设置它。

脚本如下:

'''
An open source script from Kaggle, developed by a couple dozen people. Runs fine there.
'''

if __name__ == '__main__':    

    import time
    start_time = time.time()

    print("Starting imports")
    import numpy as np
    import pandas as pd
    from sklearn.ensemble import RandomForestRegressor
    from sklearn import pipeline, grid_search
    from sklearn.base import BaseEstimator, TransformerMixin
    from sklearn.pipeline import FeatureUnion
    from sklearn.decomposition import TruncatedSVD
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics import mean_squared_error, make_scorer
    from nltk.stem.porter import *
    stemmer = PorterStemmer()
    import re
    import random
    random.seed(2016)

    print("Reading in data")
    df_train    = pd.read_csv('train.csv', encoding="ISO-8859-1")
    df_test     = pd.read_csv('test.csv', encoding="ISO-8859-1")
    df_pro_desc = pd.read_csv('product_descriptions.csv')
    df_attr     = pd.read_csv('attributes.csv')
    print("pull brand")
    df_brand    = df_attr[df_attr.name == "MFG Brand Name"][["product_uid", "value"]].rename(columns={"value": "brand"})
    print("dr_train.shape[0]")
    num_train   = df_train.shape[0]
    df_all = pd.concat((df_train, df_test), axis=0, ignore_index=True)
    df_all = pd.merge(df_all, df_pro_desc, how='left', on='product_uid')
    df_all = pd.merge(df_all, df_brand, how='left', on='product_uid')

    print("Functions")
    def str_stem(s): 
        if isinstance(s, str):
            s = s.lower()
            s = s.replace("'","in.") 
            s = s.replace("inches","in.") 
            s = s.replace("inch","in.")
            s = s.replace(" in ","in. ") 
            s = s.replace(" in.","in.") 

            s = s.replace("''","ft.") 
            s = s.replace(" feet ","ft. ") 
            s = s.replace("feet","ft.") 
            s = s.replace("foot","ft.") 
            s = s.replace(" ft ","ft. ") 
            s = s.replace(" ft.","ft.") 

            s = s.replace(" pounds ","lb. ")
            s = s.replace(" pound ","lb. ") 
            s = s.replace("pound","lb.") 
            s = s.replace(" lb ","lb. ") 
            s = s.replace(" lb.","lb.") 
            s = s.replace(" lbs ","lb. ") 
            s = s.replace("lbs.","lb.") 

            s = s.replace(" x "," xby ")
            s = s.replace("*"," xby ")
            s = s.replace(" by "," xby")
            s = s.replace("x0"," xby 0")
            s = s.replace("x1"," xby 1")
            s = s.replace("x2"," xby 2")
            s = s.replace("x3"," xby 3")
            s = s.replace("x4"," xby 4")
            s = s.replace("x5"," xby 5")
            s = s.replace("x6"," xby 6")
            s = s.replace("x7"," xby 7")
            s = s.replace("x8"," xby 8")
            s = s.replace("x9"," xby 9")
            s = s.replace("0x","0 xby ")
            s = s.replace("1x","1 xby ")
            s = s.replace("2x","2 xby ")
            s = s.replace("3x","3 xby ")
            s = s.replace("4x","4 xby ")
            s = s.replace("5x","5 xby ")
            s = s.replace("6x","6 xby ")
            s = s.replace("7x","7 xby ")
            s = s.replace("8x","8 xby ")
            s = s.replace("9x","9 xby ")

            s = s.replace(" sq ft","sq.ft. ") 
            s = s.replace("sq ft","sq.ft. ")
            s = s.replace("sqft","sq.ft. ")
            s = s.replace(" sqft ","sq.ft. ") 
            s = s.replace("sq. ft","sq.ft. ") 
            s = s.replace("sq ft.","sq.ft. ") 
            s = s.replace("sq feet","sq.ft. ") 
            s = s.replace("square feet","sq.ft. ") 

            s = s.replace(" gallons ","gal. ") 
            s = s.replace(" gallon ","gal. ") 
            s = s.replace("gallons","gal.") 
            s = s.replace("gallon","gal.") 
            s = s.replace(" gal ","gal. ") 
            s = s.replace(" gal","gal.") 

            s = s.replace("ounces","oz.")
            s = s.replace("ounce","oz.")
            s = s.replace(" oz.","oz. ")
            s = s.replace(" oz ","oz. ")

            s = s.replace("centimeters","cm.")    
            s = s.replace(" cm.","cm.")
            s = s.replace(" cm ","cm. ")

            s = s.replace("wayy", "way")
            s = s.replace("milimeters","mm.")
            s = s.replace(" mm.","mm.")
            s = s.replace(" mm ","mm. ")

            s = s.replace("°","deg. ")
            s = s.replace("degrees","deg. ")
            s = s.replace("degree","deg. ")

            s = s.replace("volts","volt. ")
            s = s.replace("volt","volt. ")

            s = s.replace("watts","watt. ")
            s = s.replace("watt","watt. ")

            s = s.replace("ampere","amp. ")
            s = s.replace("amps","amp. ")
            s = s.replace(" amp ","amp. ")

            s = s.replace("whirpool","whirlpool")
            s = s.replace("whirlpoolga", "whirlpool")
            s = s.replace("whirlpoolstainless","whirlpool stainless")

            s = s.replace("  "," ")
            s = (" ").join([stemmer.stem(z) for z in s.split(" ")])
            return s.lower()
        else:
            return "null"

    def str_common_word(str1, str2):
        words, cnt = str1.split(), 0
        for word in words:
            if str2.find(word)>=0:
                cnt+=1
        return cnt

    def str_whole_word(str1, str2, i_):
        cnt = 0
        while i_ < len(str2):
            i_ = str2.find(str1, i_)
            if i_ == -1:
                return cnt
            else:
                cnt += 1
                i_ += len(str1)
        return cnt

    def fmean_squared_error(ground_truth, predictions):
        fmean_squared_error_ = mean_squared_error(ground_truth, predictions)**0.5
        return fmean_squared_error_

    RMSE  = make_scorer(fmean_squared_error, greater_is_better=False)

    class cust_regression_vals(BaseEstimator, TransformerMixin):
        def fit(self, x, y=None):
            return self
        def transform(self, hd_searches):
            d_col_drops=['id','relevance','search_term','product_title','product_description','product_info','attr','brand']
            hd_searches = hd_searches.drop(d_col_drops,axis=1).values
            return hd_searches

    class cust_txt_col(BaseEstimator, TransformerMixin):
        def __init__(self, key):
            self.key = key
        def fit(self, x, y=None):
            return self
        def transform(self, data_dict):
            return data_dict[self.key].apply(str)

    def fmean_squared_error(ground_truth, predictions):
        fmean_squared_error_ = mean_squared_error(ground_truth, predictions)**0.5
        return fmean_squared_error_

    RMSE  = make_scorer(fmean_squared_error, greater_is_better=False)

    #if adding features consider any drops on the 'cust_regression_vals' class
    df_all['search_term'] = df_all['search_term'].map(lambda x:str_stem(x))
    df_all['product_title'] = df_all['product_title'].map(lambda x:str_stem(x))
    df_all['product_description'] = df_all['product_description'].map(lambda x:str_stem(x))
    df_all['brand'] = df_all['brand'].map(lambda x:str_stem(x))
    df_all['len_of_query'] = df_all['search_term'].map(lambda x:len(x.split())).astype(np.int64)
    df_all['len_of_title'] = df_all['product_title'].map(lambda x:len(x.split())).astype(np.int64)
    df_all['len_of_description'] = df_all['product_description'].map(lambda x:len(x.split())).astype(np.int64)
    df_all['len_of_brand'] = df_all['brand'].map(lambda x:len(x.split())).astype(np.int64)
    df_all['product_info'] = df_all['search_term']+"\t"+df_all['product_title'] +"\t"+df_all['product_description']
    df_all['query_in_title'] = df_all['product_info'].map(lambda x:str_whole_word(x.split('\t')[0],x.split('\t')[1],0))
    df_all['query_in_description'] = df_all['product_info'].map(lambda x:str_whole_word(x.split('\t')[0],x.split('\t')[2],0))
    df_all['word_in_title'] = df_all['product_info'].map(lambda x:str_common_word(x.split('\t')[0],x.split('\t')[1]))
    df_all['word_in_description'] = df_all['product_info'].map(lambda x:str_common_word(x.split('\t')[0],x.split('\t')[2]))
    df_all['ratio_title'] = df_all['word_in_title']/df_all['len_of_query']
    df_all['ratio_description'] = df_all['word_in_description']/df_all['len_of_query'] #hack-r.com
    df_all['attr'] = df_all['search_term']+"\t"+df_all['brand']
    df_all['word_in_brand'] = df_all['attr'].map(lambda x:str_common_word(x.split('\t')[0],x.split('\t')[1])) #linkedin.com/in/datasci
    df_all['ratio_brand'] = df_all['word_in_brand']/df_all['len_of_brand']
    df_brand = pd.unique(df_all.brand.ravel())
    d={}
    i = 1
    for s in df_brand:
        d[s]=i
        i+=1
    df_all['brand_feature'] = df_all['brand'].map(lambda x:d[x])
    df_all['search_term_feature'] = df_all['search_term'].map(lambda x:len(x))
    df_train = df_all.iloc[:num_train]
    df_test = df_all.iloc[num_train:]
    id_test = df_test['id']
    y_train = df_train['relevance'].values
    X_train =df_train[:]
    X_test = df_test[:]
    print("--- Features Set: %s minutes ---" % round(((time.time() - start_time)/60),2))

    rfr = RandomForestRegressor(n_estimators = 175, n_jobs = 1, random_state = 2016, verbose = 0)
    tfidf = TfidfVectorizer(ngram_range=(1, 1), stop_words='english')
    tsvd = TruncatedSVD(n_components=25, random_state = 2016)
    clf = pipeline.Pipeline([
            ('union', FeatureUnion(
                        transformer_list = [
                            ('cst',  cust_regression_vals()),  
                            ('txt1', pipeline.Pipeline([('s1', cust_txt_col(key='search_term')), ('tfidf1', tfidf), ('tsvd1', tsvd)])),
                            ('txt2', pipeline.Pipeline([('s2', cust_txt_col(key='product_title')), ('tfidf2', tfidf), ('tsvd2', tsvd)])),
                            ('txt3', pipeline.Pipeline([('s3', cust_txt_col(key='product_description')), ('tfidf3', tfidf), ('tsvd3', tsvd)])),
                            ('txt4', pipeline.Pipeline([('s4', cust_txt_col(key='brand')), ('tfidf4', tfidf), ('tsvd4', tsvd)]))
                            ],
                        transformer_weights = {
                            'cst': 1.0,
                            'txt1': 0.5,
                            'txt2': 0.25,
                            'txt3': 0.0,
                            'txt4': 0.5
                            },
                    n_jobs = -1
                    )), 
            ('rfr', rfr)])
    param_grid = {'rfr__max_features': [24], 'rfr__max_depth': [29]}
    model = grid_search.GridSearchCV(estimator = clf, param_grid = param_grid, n_jobs = -1, cv = 2, verbose = 20, scoring=RMSE)
    model.fit(X_train, y_train)

    print("Best parameters found by grid search:")
    print(model.best_params_)
    print("Best CV score:")
    print(model.best_score_)

    y_pred = model.predict(X_test)
    y_train_pred = model.predict(X_train)
    print("--- Running Predictions for Training Data --")
    pd.DataFrame({"id": id_test, "relevance": y_pred}).to_csv('test_python_predictions.csv', index=False)
    print("--- Training & Testing: %s minutes ---" % ((time.time() - start_time)/60))
    print("--- Running Predictions for Training Data --")
    pd.DataFrame({"id": id_test, "relevance": y_train_pred}).to_csv('train_python_predictions.csv', index=False)
    print("-- Now Printing Training Data to File --")
    pd.DataFrame({"id": id_test, "Data": X_train}).to_csv('X_train.csv', index=False)
    print("-- Now Printing Testing Data to File --")
    pd.DataFrame({"id": id_test, "Data": X_train}).to_csv('X_test.csv', index=False)

以下是错误:

C:\Users\hackr\Desktop\hd>python wc.py

    Starting imports
    Reading in data
    pull brand
    dr_train.shape[0]
    Functions
    --- Features Set: 7.18 minutes ---
    Fitting 2 folds for each of 1 candidates, totalling 2 fits
    Process SpawnPoolWorker-3:
    Traceback (most recent call last):
      File "C:\python35\lib\multiprocessing\process.py", line 254, in _bootstrap
        self.run()
      File "C:\python35\lib\multiprocessing\process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "C:\python35\lib\multiprocessing\pool.py", line 108, in worker
        task = get()
      File "C:\python35\lib\site-packages\sklearn\externals\joblib\pool.py", line 360, in get
        return recv()
      File "C:\python35\lib\multiprocessing\connection.py", line 251, in recv
        return ForkingPickler.loads(buf.getbuffer())
    AttributeError: Can't get attribute 'cust_regression_vals' on <module '__mp_main__' from 'C:\\Users\\hackr\\Desktop\\hd\\wc.py'>
    Process SpawnPoolWorker-2:
    Traceback (most recent call last):
      File "C:\python35\lib\multiprocessing\process.py", line 254, in _bootstrap
        self.run()
      File "C:\python35\lib\multiprocessing\process.py", line 93, in run
        self._target(*self._args, **self._kwargs)
      File "C:\python35\lib\multiprocessing\pool.py", line 108, in worker
        task = get()
      File "C:\python35\lib\site-packages\sklearn\externals\joblib\pool.py", line 360, in get
        return recv()
      File "C:\python35\lib\multiprocessing\connection.py", line 251, in recv
        return ForkingPickler.loads(buf.getbuffer())
    AttributeError: Can't get attribute 'cust_regression_vals' on <module '__mp_main__' from 'C:\\Users\\hackr\\Desktop\\hd\\wc.py'>

如果不通过任务管理器杀死 Python 进程,我什至无法杀死这个脚本。如果我在运行时使用,它会喷出更多错误并在屏幕上显示我的键盘中断,但不会停止。ctrl+C

评论

0赞 Hack-R 2/17/2016
我终于让它工作了,但我仍然不明白问题到底是什么,或者我是如何解决的。一旦我想通了,我会添加一个答案。没有关于未满足依赖项的消息,但为了更好地理解正在使用的技术,我单独安装了 joblib 并尝试运行一些示例代码以查看发生了什么。它运行良好,此安装可能为上述脚本随后的工作做出了贡献 (?)。我所知道的唯一改变的另一件事是减少模型的最大深度。

答:

0赞 Jason S 2/16/2016 #1

最可能的原因是您的类(具体来说,仅在内部定义,这限制了它仅在脚本作为主程序直接执行而不是被视为模块时才被定义。定义通常不会有这样的条件。cust_regression_valsif __name__ == '__main__'

评论

0赞 Jason S 2/16/2016
根据你正在做的事情,你很可能需要围绕你的脚本。但是不要在没有充分理由的情况下将 defs 和 classes 放在 if 块中。if __name__ == '__main__'