提问人:swygerts 提问时间:9/8/2020 最后编辑:swygerts 更新时间:9/12/2020 访问量:736
将 Python 脚本作为数据流作业提交
Submitting Python Script as Dataflow Job
问:
我有一个简单的处理作业,我想将其作为数据流作业提交。它只是一个 python 文件,它将 csv 作为输入,将两个模型应用于输入,并将输出添加到 csv。如何将其作为 Dataflow 作业提交?我可以在本地计算机上执行此操作,但是每当我尝试在GCP上提交时,都会出现错误:
WARNING:root:Make sure that locally built Python SDK docker image has
Python 3.7 interpreter.
Traceback (most recent call last):
File "predict_DF.py", line 119, in <module>
run()
File "predict_DF.py", line 113, in run
p.run()
File "/opt/anaconda3/lib/python3.7/site-
packages/apache_beam/pipeline.py", line 513, in run
allow_proto_holders=True).run(False)
File "/opt/anaconda3/lib/python3.7/site-
packages/apache_beam/pipeline.py", line 526, in run
return self.runner.run_pipeline(self, self._options)
File "/opt/anaconda3/lib/python3.7/site-
packages/apache_beam/runners/dataflow/dataflow_runner.py", line 582,
in run_pipeline
self.dataflow_client.create_job(self.job), self)
File "/opt/anaconda3/lib/python3.7/site-
packages/apache_beam/utils/retry.py", line 236, in wrapper
return fun(*args, **kwargs)
File "/opt/anaconda3/lib/python3.7/site-
packages/apache_beam/runners/dataflow/internal/apiclient.py", line
659, in create_job
self.create_job_description(job)
File "/opt/anaconda3/lib/python3.7/site-
packages/apache_beam/runners/dataflow/internal/apiclient.py", line
712, in create_job_description
io.BytesIO(job.proto_pipeline.SerializeToString()))
File "/opt/anaconda3/lib/python3.7/site-
packages/apache_beam/runners/dataflow/internal/apiclient.py", line
637, in stage_file
response = self._storage_client.objects.Insert(request, upload=upload)
File "/opt/anaconda3/lib/python3.7/site- packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1156, in Insert
upload=upload, upload_config=upload_config)
File "/opt/anaconda3/lib/python3.7/site-packages/apitools/base/py/base_api.py", line 729, in _RunMethod
http, http_request, **opts)
File "/opt/anaconda3/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 350, in MakeRequest
check_response_func=check_response_func)
File "/opt/anaconda3/lib/python3.7/site-packages/apitools/base/py/http_wrapper.py", line 400, in _MakeRequestNoRetry
redirections=redirections, connection_type=connection_type)
File "/opt/anaconda3/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
redirections, connection_type)
File "/opt/anaconda3/lib/python3.7/site-packages/oauth2client/transport.py", line 169, in new_request
redirections, connection_type)
File "/opt/anaconda3/lib/python3.7/site-packages/httplib2/__init__.py", line 1991, in request
cachekey,
File "/opt/anaconda3/lib/python3.7/site-packages/httplib2/__init__.py", line 1651, in _request
conn, request_uri, method, body, headers
File "/opt/anaconda3/lib/python3.7/site-packages/httplib2/__init__.py", line 1558, in _conn_request
conn.request(method, request_uri, body, headers)
File "/opt/anaconda3/lib/python3.7/http/client.py", line 1252, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/opt/anaconda3/lib/python3.7/http/client.py", line 1298, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/opt/anaconda3/lib/python3.7/http/client.py", line 1247, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/opt/anaconda3/lib/python3.7/http/client.py", line 1065, in _send_output
self.send(chunk)
File "/opt/anaconda3/lib/python3.7/http/client.py", line 987, in send
self.sock.sendall(data)
File "/opt/anaconda3/lib/python3.7/ssl.py", line 1034, in sendall
v = self.send(byte_view[count:])
File "/opt/anaconda3/lib/python3.7/ssl.py", line 1003, in send
return self._sslobj.write(data)
socket.timeout: The write operation timed out
除了导入之外,这是我的代码:
命令行
python predict_DF.py \
--region $REGION \
--input $INPUT \
--output $OUTPUT \
--text_extraction_model $TEXT_EXTRACTION_MODEL \
--model $MODEL \
--translation_dictionary $TRANSLATION_DICTIONARY \
--runner $RUNNER \
--project $PROJECT \
--temp_location $TEMP_LOCATION \
--staging_location $STAGING
predict_DF.py
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the prediction pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
required=True,
default='/Users/me/input.csv',
help='Input file for predictions'
)
parser.add_argument(
'--output',
dest='output',
required=True,
default='/Users/me/output.csv,
help='Output file to write results to'
)
parser.add_argument(
'--model',
dest='model',
required=True,
default='/Users/me/model.joblib'
)
parser.add_argument(
'--text_extraction_model',
dest='text_extraction_model',
required=True,
default='/Users/me/tfidfit',
help='Text extraction model for processing input'
)
parser.add_argument(
'--translation_dictionary',
dest='translation_dictionary',
required=True,
default='/Users/me/id_to_category',
help='Dictionary relating numerical id to classification group'
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
model = joblib.load(known_args.model)
tfidfit = pickle.load(open(known_args.text_extraction_model, 'rb'))
id_to_category = pickle.load(open(known_args.translation_dictionary, 'rb'))
def get_pred(x):
pred = id_to_category[model.predict(tfidfit.transform([x]).toarray()).tolist()[0]]
return pred
def parse_method(line):
import csv
reader = csv.reader(line.split('\n'))
for csv_row in reader:
values = [x for x in csv_row]
row = []
for value in csv_row:
if value != None:
row.append(value)
row.append(get_pred(row[16]))
return ",".join(row)
lines = (p | beam.io.ReadFromText(known_args.input) |
'Split' >> beam.Map(lambda s: parse_method(s)) |
'Output to file' >> beam.io.WriteToText(known_args.output)
)
p.run()
run()
答: 暂无答案
评论
gcloud dataflow jobs list