Merge branch ‘dev’ into ‘master’ fix: 修复pb引用问题 See merge request byteair/volcengine-sdk-python-rec!30
Merge branch ‘dev’ into ‘master’
fix: 修复pb引用问题
See merge request byteair/volcengine-sdk-python-rec!30
volcengine data/predict api sdk, python version
import uuid from datetime import datetime, timedelta from byteair import ClientBuilder, Client from byteair.protocol.volcengine_byteair_pb2 import * from core import Option, NetException, BizException, metrics, MetricsOption # 必传,租户id. TENANT_ID = "xxx" # 必传,应用id. APPLICATION_ID = "xxx" # 必传,密钥AK. AK = "xxx" # 必传,密钥SK. SK = "xxx" # 必传,域名. HOST = "xxx" client: Client = ClientBuilder() \ .tenant_id(TENANT_ID) \ .application_id(APPLICATION_ID) \ .ak(AK) \ .sk(SK) \ .hosts([HOST]) \ .build() # metrics上报初始化.建议开启,方便火山侧排查问题. metrics.init((MetricsOption.with_metrics_domain(HOST),)) def write(): # 此处为测试数据,实际调用时需注意字段类型和格式 data_list = [ { "id": "item_id1", "title": "test_title1", "status": 0, "brand": "volcengine", "pub_time": 1583641807, "current_price": 1.1, }, { "id": "item_id2", "title": "test_title2", "status": 1, "brand": "volcengine", "pub_time": 1583641503, "current_price": 2.2, } ] # topic为枚举值,请参考API文档 topic = "item" # 传输天级数据 opts = ( # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"), # 增量实时同步("incremental_sync_streaming") Option.with_stage("pre_sync"), # 必传,数据产生日期,实际传输时需修改为实际日期 Option.with_data_date(datetime(year=2022, month=1, day=1)), Option.with_timeout(timedelta(milliseconds=1000)), Option.with_request_id(str(uuid.uuid1())), ) try: write_response = client.write_data(data_list, topic, *opts) except BizException as e: print("[write] occur err, msg: %s" % e) return if not write_response.status.success: print("[write] failure") return print("[write] success") return def done(): date_list = [datetime(year=2022, month=1, day=1)] # topic为枚举值,请参考API文档 topic = "item" opts = ( # 预同步("pre_sync"), 历史数据同步("history_sync"), 增量天级同步("incremental_sync_daily"), # 增量实时同步("incremental_sync_streaming") Option.with_stage("pre_sync"), Option.with_timeout(timedelta(milliseconds=1000)), Option.with_request_id(str(uuid.uuid1())), ) try: done_response = client.done(date_list, topic, *opts) except BizException as e: print("[done] occur err, msg: %s" % e) return if not done_response.status.success: print("[done] failure") return print("[done] success") return def predict(): # 构造predict请求体 predict_request = PredictRequest() user = predict_request.user user.uid = 'uid1' context = predict_request.context context.spm = "1$##$2$##$3$##$4" context.extra["extra_key"] = "extra_value" feature = context.feature feature.stringFeature["key"] = "value1" feature.stringFeature["key"] = "value2" feature.stringArrayFeature["array_key"].values.append("array_value1") feature.stringArrayFeature["array_key"].values.append("array_value2") filter = context.filter filter.stringFilter["key"] = "value" filter.stringArrayFilter["array_key"].values.append("array_value1") filter.stringArrayFilter["array_key"].values.append("array_value2") candidate_item1 = predict_request.candidateItems.add() candidate_item1.id = "item_id1" candidate_item2 = predict_request.candidateItems.add() candidate_item2.id = "item_id2" opts = ( Option.with_request_id(str(uuid.uuid1())), Option.with_scene("default"), Option.with_timeout(timedelta(milliseconds=1000)), Option.with_headers({"Enable-Spm-Route": "true"}) ) try: rsp = client.predict(predict_request, *opts) except (NetException, BizException) as e: print("[predict] occur error, msg: %s" % e) return if not rsp.success: print("[predict] failure") return print("[predict] success") def callback(): # 构造callback请求体 callback_request = CallbackRequest() # 对应的predict请求的request id callback_request.predict_request_id = "xxx" # 对应的predict请求的uid callback_request.uid = "uid1" # 对应的predict请求的scene. callback_request.scene = "default" # 对应的predict请求的items列表 callback_item1 = callback_request.items.add() callback_item1.id = "item_id1" callback_item1.pos = "position1" callback_item1.extra = "{\"reason\":\"exposure\"}" callback_item2 = callback_request.items.add() callback_item2.id = "item_id2" callback_item2.pos = "position2" callback_item2.extra = "{\"reason\":\"filter\"}" # callback请求上下文 callback_context = CallbackContext() callback_context.spm = "1$##$2$##$3$##$4" callback_request.context = callback_context opts = ( Option.with_request_id(str(uuid.uuid1())), Option.with_timeout(timedelta(milliseconds=1000)), ) try: rsp = client.callback(callback_request, *opts) except (NetException, BizException) as e: print("[callback] occur error, msg: %s" % e) return if not rsp.success: print("[callback] failure") return print("[callback] success")
版权所有:中国计算机学会技术支持:开源发展技术委员会 京ICP备13000930号-9 京公网安备 11010802032778号
volcengine data/predict api sdk, python version