sdk
以下为AOP中类和方法的一些说明
class core.AOP
Bases: object
async static init(config, task_type=None)
AOP 节点初始化方法
- Parameters:
- config (AOPConfig) – AOP 配置类对象
- task_type (Type [**BaseTask ] , optional) – 任务类型. Defaults to None.
- Returns: AOP 节点对象
- Return type: AOP
- Example:
>>> AOP.init(config, sdk.ExampleTask)
static init_node(config, max_buffer_size=None, max_data_upload_bandwidth=None, reconnect_interval=60.0)
初始化 AOP 端侧节点
- Parameters:
- config (AOPConfig) – AOP 配置
- max_buffer_size (Optional [**str ] , optional) – 数据上传缓冲区大小. Defaults to None.
- max_data_upload_bandwidth (Optional [**str ] , optional) – 数据上传带宽. Defaults to None.
- Returns: AOP 节点对象
- Return type: AOP
- Example:
>>> AOP.init_node(config, "5MB", "10Mbps")
async create_agent(agent_type, virtual_env=False, agent_id=None)
创建 agent 对象
- Parameters:
- agent_type (Type [**AGENT ]) – agent 类型
- virtual_env (bool , optional) – agent 是否处于虚拟环境. Defaults to False.
- agent_id (Optional [**str ]) – Agent Id
- Returns: agent 对象
- Return type: AGENT
- Example:
>>> agent: sdk.ExampleAgent = await aop.create_agent(sdk.ExampleAgent)
async create_env(env_type)
创建 environment 对象
- Parameters: env_type (Type [**ENV ]) – environment 类型
- Returns: environment 对象
- Return type: ENV
- Example:
>>> env: sdk.ExampleEnv = await aop.create_env(sdk.ExampleEnv)
async get_public_agent(agent_type)
获取公共 agent 对象
- Parameters: agent_type (Type [**Any ]) – 公共 agent 类型
- Returns: 公共 agent 对象
- Return type: A
- Example:
>>> agent: ExampleAgent = await aop.get_public_agent(ExampleAgent)
async get_agent(agent_type, agent_id, virtual_env=False)
根据 agent_id 获取 agent 对象
- Parameters:
- agent_type (Type [**AGENT ]) – 公共 agent 类型
- agent_id (str) – Agent id
- virtual_env (bool , optional) – agent 是否处于虚拟环境. Defaults to False.
- Returns: agent 对象
- Return type: AGENT
common.context_manager.open_session()
打开异步会话, 会话内所有操作将会拥有同一个 session_id
- Example:
>>> async with open_session():
common.context_manager.get_session_id()
获取当前会话的 session_id
- Returns: session_id
- Return type: str
- Example:
>>> session_id: str = get_session_id()
class common.context_manager.Context(transaction_id: str, session_id: str)
class common.dataset_event.DatasetEventData(client_id, dataset_name, event_type, current_num, add_num, notify_num, server_id, namespace, cap_id=None, batch_max_time=None)
数据库事件回传数据
class common.dataset_event.DatasetEventType(value)
数据库事件类型
class common.dataset_event.DatasetEvent(dataset_name, event_type, num, start_time)
数据库事件
- Returns: 数据库事件实例
- Return type: DatasetEvent
unregister_event()
取消事件注册
async subscribe_callback_async(callback_fn)
注册事件回调函数,不会阻塞主线程
- Parameters: callback_fn (Callable [**Any ]) – 回调函数
- Example:
>>> def fn(data: DatasetEventData):
>>> pass
>>> await event.subscribe_callback_async(fn)
async subscribe_callback(callback_fn)
注册事件回调函数
- Parameters: callback_fn (Callable [**Any ]) – 回调函数
- Example:
>>> def fn(data: DatasetEventData):
>>> pass
>>> await event.subscribe_callback_async(fn)
class utils.csv_reader.CSVReader(file_path='')
cvs 文件读取类
- Returns: CSVReader 实例
- Return type: CVSReader
get_json()
将 csv 文件转换为 json
- Returns: json 字符串 list
- Return type: List[str]
- Example:
>>> json_list: List[str] = csv_reader.get_json()
utils.idl_tool.import_py2(path)
引入 path 中的模块并执行
- Parameters: path (str) – 模块路径
- Returns: 模块类型
- Return type: ModuleType
- Example:
>>> module_type: ModuleType = import_py2("py path")
utils.idl_tool.file_path_to_package_module(file_path)
获取 python 文件对应的包名和模块名
- Parameters: file_path (str) – python 文件路径
- Returns: python 文件的包名和模块名
- Return type: Tuple[str, str]
utils.idl_tool.guess_lang_from_path(path)
获取文件对应编程语言
- Returns: 编程语言类型
- Return type: Optional[Literal[“hoas.py”, “idl.txt”, “idl.json”, “idl.bin”, “python”, “csharp”]]
utils.idl_tool.load_idl(input_file)
从文件中加载 idl, 文件可以是 python 或者 json 文件
- Parameters: input_file (str) – 文件路径
- Raises: RuntimeError – python 和 json 以外的文件格式会抛出异常
- Returns: idl 运行时对象
- Return type: IDL.AopTaskInfo
class runtime.dataset.TagFilter(must=None, should=None, must_not=None)
数据 tag 筛选条件。满足以下所有的数据都会被筛选出来
- Parameters:
- must (list [**str ] | None) – 筛选出包含 must 中所有标签的数据。must 为 [] 时,返回不含标签的数据,must 为 None 时,返回所有数据。默认值为 None
- should (list [**str ] | None) – 筛选出包含 should 中任意一个标签的数据。should 为 [] 时,返回不含标签的数据,should 为 None 时,返回所有数据。默认值为 None
- must_not (list [**str ] | None) – 筛选出不包含 must_not 中任意一个标签的数据。must_not 为 [] 时,返回包含任意标签的数据,must_not 为 None 时,返回所有数据。默认值为 None
class runtime.dataset.RecordRef(*args, **kwds)
property record*data *: RecordData[I, O]_
从 RecordRef 中获取 intput, output, traceInfo, metadata 转为 RecordData
- Returns: RecordData 对象- Examples:
>>> record_ref = RecordRef(1, 'input', 'output', TraceInfo('agentName', 'capType', 'capName', 'taskType', 'agentId', 'traceId', 'episodeId'), MetaData(), [])
>>> record_ref.record_data
RecordData(input='input', output='output', traceInfo=TraceInfo(agentName='agentName', capType='capType', capName='capName', taskType='taskType', agentId='agentId', traceId='traceId', episodeId='episodeId'), metadata=MetaData(), tag=[])
async update(data)
更新一条记录的数据
- Parameters: data (RecordData [**I , O ]) – 需要更新的数据
- Returns: 是否更新成功
- Return type: bool
- Examples:
>>> record_ref = RecordRef(1, 'input', 'output', TraceInfo('agentName', 'capType', 'capName', 'taskType', 'agentId', 'traceId', 'episodeId'), MetaData())
>>> record_ref.update(RecordData('input', 'output', TraceInfo('agentName', 'capType', 'capName', 'taskType', 'agentId', 'traceId', 'episodeId'), MetaData()))
True
async delete()
删除一条记录
- Returns: 是否删除成功
- Return type: bool
- Examples:
>>> record_ref = RecordRef(1, 'input', 'output', TraceInfo('agentName', 'capType', 'capName', 'taskType', 'agentId', 'traceId', 'episodeId'), MetaData())
>>> record_ref.delete()
class runtime.dataset.RecordData(*args, **kwds)
class runtime.dataset.Dataset(*args, **kwds)
async static init(config, run_mode='local')
初始化 Dataset
- Parameters: config (AOPConfig) – AOP 配置,需要包含 project_id, key, secret, server_id, platform_service_url
- Returns: Dataset 对象
- Return type: Dataset[Any, Any]
- Examples:
>>> from fuxi.aop.config import AOPConfig
>>> def aop_config() -> AOPConfig:
... return {
... "project_id": "1561",
... "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
... "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
... "server_id": "4145",
... "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
... }
>>> dataset = await Dataset.init(aop_config())
async create(table_name, /, schema)
创建新数据集,如果数据集已经存在,则使用已经存在的数据集
- Parameters:
- table_name (str) – 数据集名称
- schema (Tuple [**Type [**I ] , Type [**O ] ]) – 输入输出数据的类型
- Returns: 指定数据集名称的 Dataset 对象
- Return type: Dataset[I, O]
- Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
... return {
... "project_id": "1561",
... "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
... "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
... "server_id": "4145",
... "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
... }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
... label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
... label: Literal[1, 2, 3]
...
>>> table_name = 'test_dataset'
>>> new_dataset = await dataset.create(table_name, schema = (CapInput, CapOutput))
async create_nx(table_name, /, schema)
创建新数据集,如果数据集已经存在,则抛出异常
- Parameters:
- table_name (str) – 数据集名称
- schema (Tuple [**Type [**I ] , Type [**O ] ]) – 输入输出数据的类型
- Raises: DatasetAlreadyExistsError – 数据集已经存在
- Returns: Dataset 对象
- Return type: Dataset[I, O]
- Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
... return {
... "project_id": "1561",
... "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
... "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
... "server_id": "4145",
... "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
... }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
... label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
... label: Literal[1, 2, 3]
...
>>> table_name = 'test_dataset'
>>> table = await dataset.create_nx(table_name, schema = (CapInput, CapOutput))
async load(table_name, /, schema)
加载已有的数据集(根据表路径)
- Parameters:
- table_name (str) – 数据集表路径
- schema (Tuple [**Type [**I ] , Type [**O ] ]) – 输入输出数据的类型
- Returns: Dataset 对象
- Return type: Dataset[I, O]
- Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
... return {
... "project_id": "1561",
... "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
... "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
... "server_id": "4145",
... "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
... }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
... label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
... label: Literal[1, 2, 3]
...
>>> from fuxi.aop.config import AOPConfig
>>> def aop_config() -> AOPConfig:
... return {
... "project_id": "1561",
... "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
... "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
... "server_id": "4145",
... "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
... }
>>> dataset = await Dataset.init(aop_config())
>>> table_name = 'test_dataset'
>>> table = await dataset.load(table_name, schema = (CapInput, CapOutput))
async load_from_cap(cap_path, /, schema)
加载已有的数据集(根据能力路径)
- Parameters:
- cap_path (str) – 能力路径, 格式为 task.agent.cap
- schema (Tuple [**Type [**I ] , Type [**O ] ]) – 输入输出数据的类型
- Returns: Dataset 对象
- Return type: Dataset[I, O]
- Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
... return {
... "project_id": "1561",
... "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
... "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
... "server_id": "4145",
... "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
... }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
... label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
... label: Literal[1, 2, 3]
...
>>> cap_path = 'task.agent.cap'
>>> table = await dataset.load(cap_path, schema = (CapInput, CapOutput))
async fetch(offset=0, limit=10000, *, cols=None, where=None, order_by=None, tag=None)
获取(下载)数据集中的所有数据, 默认按照 createTime 升序排列
- Parameters:
- offset (int) – 偏移量. 默认值为 0.
- limit (int) – 限制数量. 默认值为 10000. 最大值为 10000
- Returns: 数据对象的列表
- Return type: list[RecordRef[I, O]]
- Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
... return {
... "project_id": "1561",
... "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
... "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
... "server_id": "4145",
... "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
... }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
... label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
... label: Literal[1, 2, 3]
...
>>> table_name = 'test_dataset'
>>> table = await dataset.load(table_name, schema = (CapInput, CapOutput))
>>> await table.fetch(offset=10, limit=100, order_by={"agentId": "asc", "dataId": "asc"})
async query(id=None, tag=None)
查询指定的数据
- Parameters:
- id (int | None) – 数据的 id
- tag (TagFilter | None) – 数据的 tag 筛选条件
- Returns: 数据对象的列表
- Return type: list[RecordRef[I, O]]
- Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
... return {
... "project_id": "1561",
... "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
... "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
... "server_id": "4145",
... "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
... }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
... label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
... label: Literal[1, 2, 3]
...
>>> table = await dataset.load('task.agent.cap', schema = (CapInput, CapOutput))
>>> await table.query(id=1)
>>> await table.query(tag=TagFilter(must=["tag1", "tag2"]))
async upload(data_list)
上传数据到数据集
- Parameters: data_list (list [RecordData [**I , O ] ]) – 数据对象的列表
- Returns: 是否上传成功
- Return type: bool
- Examples:
>>> @dataclass
... class CapInput:
... label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
... label: Literal[1, 2, 3]
...
>>> table_name = 'test_dataset'
>>> table = await dataset.load(table_name, schema = (CapInput, CapOutput))
>>> await table.upload([RecordData(CapInput(1), CapOutput(1))])
set_serializer(input_seri, output_seri)
设置序列化器
- Parameters:
- input_seri (Serializer [**Any ]) – cap input 序列化器
- output_seri (Serializer [**Any ]) – cap output 序列化器
async load_view_history()
加载视图历史数据集
- Returns: 视图历史数据集
- Return type: DatasetViewHistoryDataset