AOP SDK 开发指南
SDK

sdk

以下为AOP中类和方法的一些说明

class core.AOP

Bases: object

async static init(config, task_type=None)

AOP 节点初始化方法

  • Parameters:
    • config (AOPConfig) – AOP 配置类对象
    • task_type (Type [**BaseTask ] , optional) – 任务类型. Defaults to None.
  • Returns: AOP 节点对象
  • Return type: AOP
  • Example:
>>> AOP.init(config, sdk.ExampleTask)

static init_node(config, max_buffer_size=None, max_data_upload_bandwidth=None, reconnect_interval=60.0)

初始化 AOP 端侧节点

  • Parameters:
    • config (AOPConfig) – AOP 配置
    • max_buffer_size (Optional [**str ] , optional) – 数据上传缓冲区大小. Defaults to None.
    • max_data_upload_bandwidth (Optional [**str ] , optional) – 数据上传带宽. Defaults to None.
  • Returns: AOP 节点对象
  • Return type: AOP
  • Example:
>>> AOP.init_node(config, "5MB", "10Mbps")

async create_agent(agent_type, virtual_env=False, agent_id=None)

创建 agent 对象

  • Parameters:
    • agent_type (Type [**AGENT ]) – agent 类型
    • virtual_env (bool , optional) – agent 是否处于虚拟环境. Defaults to False.
    • agent_id (Optional [**str ]) – Agent Id
  • Returns: agent 对象
  • Return type: AGENT
  • Example:
>>> agent: sdk.ExampleAgent = await aop.create_agent(sdk.ExampleAgent)

async create_env(env_type)

创建 environment 对象

  • Parameters: env_type (Type [**ENV ]) – environment 类型
  • Returns: environment 对象
  • Return type: ENV
  • Example:
>>> env: sdk.ExampleEnv = await aop.create_env(sdk.ExampleEnv)

async get_public_agent(agent_type)

获取公共 agent 对象

  • Parameters: agent_type (Type [**Any ]) – 公共 agent 类型
  • Returns: 公共 agent 对象
  • Return type: A
  • Example:
>>> agent: ExampleAgent = await aop.get_public_agent(ExampleAgent)

async get_agent(agent_type, agent_id, virtual_env=False)

根据 agent_id 获取 agent 对象

  • Parameters:
    • agent_type (Type [**AGENT ]) – 公共 agent 类型
    • agent_id (str) – Agent id
    • virtual_env (bool , optional) – agent 是否处于虚拟环境. Defaults to False.
  • Returns: agent 对象
  • Return type: AGENT

common.context_manager.open_session()

打开异步会话, 会话内所有操作将会拥有同一个 session_id

  • Example:
>>> async with open_session():

common.context_manager.get_session_id()

获取当前会话的 session_id

  • Returns: session_id
  • Return type: str
  • Example:
>>> session_id: str = get_session_id()

class common.context_manager.Context(transaction_id: str, session_id: str)

class common.dataset_event.DatasetEventData(client_id, dataset_name, event_type, current_num, add_num, notify_num, server_id, namespace, cap_id=None, batch_max_time=None)

数据库事件回传数据

class common.dataset_event.DatasetEventType(value)

数据库事件类型

class common.dataset_event.DatasetEvent(dataset_name, event_type, num, start_time)

数据库事件

unregister_event()

取消事件注册

async subscribe_callback_async(callback_fn)

注册事件回调函数,不会阻塞主线程

  • Parameters: callback_fn (Callable [**Any ]) – 回调函数
  • Example:
>>> def fn(data: DatasetEventData):
>>>     pass
>>> await event.subscribe_callback_async(fn)

async subscribe_callback(callback_fn)

注册事件回调函数

  • Parameters: callback_fn (Callable [**Any ]) – 回调函数
  • Example:
>>> def fn(data: DatasetEventData):
>>>     pass
>>> await event.subscribe_callback_async(fn)

class utils.csv_reader.CSVReader(file_path='')

cvs 文件读取类

  • Returns: CSVReader 实例
  • Return type: CVSReader

get_json()

将 csv 文件转换为 json

  • Returns: json 字符串 list
  • Return type: List[str]
  • Example:
>>> json_list: List[str] = csv_reader.get_json()

utils.idl_tool.import_py2(path)

引入 path 中的模块并执行

  • Parameters: path (str) – 模块路径
  • Returns: 模块类型
  • Return type: ModuleType
  • Example:
>>> module_type: ModuleType = import_py2("py path")

utils.idl_tool.file_path_to_package_module(file_path)

获取 python 文件对应的包名和模块名

  • Parameters: file_path (str) – python 文件路径
  • Returns: python 文件的包名和模块名
  • Return type: Tuple[str, str]

utils.idl_tool.guess_lang_from_path(path)

获取文件对应编程语言

  • Returns: 编程语言类型
  • Return type: Optional[Literal[“hoas.py”, “idl.txt”, “idl.json”, “idl.bin”, “python”, “csharp”]]

utils.idl_tool.load_idl(input_file)

从文件中加载 idl, 文件可以是 python 或者 json 文件

  • Parameters: input_file (str) – 文件路径
  • Raises: RuntimeError – python 和 json 以外的文件格式会抛出异常
  • Returns: idl 运行时对象
  • Return type: IDL.AopTaskInfo

class runtime.dataset.TagFilter(must=None, should=None, must_not=None)

数据 tag 筛选条件。满足以下所有的数据都会被筛选出来

  • Parameters:
    • must (list [**str ] | None) – 筛选出包含 must 中所有标签的数据。must 为 [] 时,返回不含标签的数据,must 为 None 时,返回所有数据。默认值为 None
    • should (list [**str ] | None) – 筛选出包含 should 中任意一个标签的数据。should 为 [] 时,返回不含标签的数据,should 为 None 时,返回所有数据。默认值为 None
    • must_not (list [**str ] | None) – 筛选出不包含 must_not 中任意一个标签的数据。must_not 为 [] 时,返回包含任意标签的数据,must_not 为 None 时,返回所有数据。默认值为 None

class runtime.dataset.RecordRef(*args, **kwds)

property record*data *: RecordData[I, O]_

从 RecordRef 中获取 intput, output, traceInfo, metadata 转为 RecordData

  • Returns: RecordData 对象- Examples:
>>> record_ref = RecordRef(1, 'input', 'output', TraceInfo('agentName', 'capType', 'capName', 'taskType', 'agentId', 'traceId', 'episodeId'), MetaData(), [])
>>> record_ref.record_data
RecordData(input='input', output='output', traceInfo=TraceInfo(agentName='agentName', capType='capType', capName='capName', taskType='taskType', agentId='agentId', traceId='traceId', episodeId='episodeId'), metadata=MetaData(), tag=[])

async update(data)

更新一条记录的数据

  • Parameters: data (RecordData [**I , O ]) – 需要更新的数据
  • Returns: 是否更新成功
  • Return type: bool
  • Examples:
>>> record_ref = RecordRef(1, 'input', 'output', TraceInfo('agentName', 'capType', 'capName', 'taskType', 'agentId', 'traceId', 'episodeId'), MetaData())
>>> record_ref.update(RecordData('input', 'output', TraceInfo('agentName', 'capType', 'capName', 'taskType', 'agentId', 'traceId', 'episodeId'), MetaData()))
True

async delete()

删除一条记录

  • Returns: 是否删除成功
  • Return type: bool
  • Examples:
>>> record_ref = RecordRef(1, 'input', 'output', TraceInfo('agentName', 'capType', 'capName', 'taskType', 'agentId', 'traceId', 'episodeId'), MetaData())
>>> record_ref.delete()

class runtime.dataset.RecordData(*args, **kwds)

class runtime.dataset.Dataset(*args, **kwds)

async static init(config, run_mode='local')

初始化 Dataset

  • Parameters: config (AOPConfig) – AOP 配置,需要包含 project_id, key, secret, server_id, platform_service_url
  • Returns: Dataset 对象
  • Return type: Dataset[Any, Any]
  • Examples:
>>> from fuxi.aop.config import AOPConfig
>>> def aop_config() -> AOPConfig:
...     return {
...         "project_id": "1561",
...         "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
...         "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
...         "server_id": "4145",
...         "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
...     }
>>> dataset = await Dataset.init(aop_config())

async create(table_name, /, schema)

创建新数据集,如果数据集已经存在,则使用已经存在的数据集

  • Parameters:
    • table_name (str) – 数据集名称
    • schema (Tuple [**Type [**I ] , Type [**O ] ]) – 输入输出数据的类型
  • Returns: 指定数据集名称的 Dataset 对象
  • Return type: Dataset[I, O]
  • Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
...     return {
...         "project_id": "1561",
...         "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
...         "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
...         "server_id": "4145",
...         "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
...     }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
...     label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
...     label: Literal[1, 2, 3]
...
>>> table_name = 'test_dataset'
>>> new_dataset = await dataset.create(table_name, schema = (CapInput, CapOutput))

async create_nx(table_name, /, schema)

创建新数据集,如果数据集已经存在,则抛出异常

  • Parameters:
    • table_name (str) – 数据集名称
    • schema (Tuple [**Type [**I ] , Type [**O ] ]) – 输入输出数据的类型
  • Raises: DatasetAlreadyExistsError – 数据集已经存在
  • Returns: Dataset 对象
  • Return type: Dataset[I, O]
  • Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
...     return {
...         "project_id": "1561",
...         "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
...         "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
...         "server_id": "4145",
...         "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
...     }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
...     label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
...     label: Literal[1, 2, 3]
...
>>> table_name = 'test_dataset'
>>> table = await dataset.create_nx(table_name, schema = (CapInput, CapOutput))

async load(table_name, /, schema)

加载已有的数据集(根据表路径)

  • Parameters:
    • table_name (str) – 数据集表路径
    • schema (Tuple [**Type [**I ] , Type [**O ] ]) – 输入输出数据的类型
  • Returns: Dataset 对象
  • Return type: Dataset[I, O]
  • Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
...     return {
...         "project_id": "1561",
...         "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
...         "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
...         "server_id": "4145",
...         "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
...     }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
...     label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
...     label: Literal[1, 2, 3]
...
>>> from fuxi.aop.config import AOPConfig
>>> def aop_config() -> AOPConfig:
...     return {
...         "project_id": "1561",
...         "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
...         "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
...         "server_id": "4145",
...         "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
...     }
>>> dataset = await Dataset.init(aop_config())
>>> table_name = 'test_dataset'
>>> table = await dataset.load(table_name, schema = (CapInput, CapOutput))

async load_from_cap(cap_path, /, schema)

加载已有的数据集(根据能力路径)

  • Parameters:
    • cap_path (str) – 能力路径, 格式为 task.agent.cap
    • schema (Tuple [**Type [**I ] , Type [**O ] ]) – 输入输出数据的类型
  • Returns: Dataset 对象
  • Return type: Dataset[I, O]
  • Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
...     return {
...         "project_id": "1561",
...         "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
...         "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
...         "server_id": "4145",
...         "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
...     }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
...     label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
...     label: Literal[1, 2, 3]
...
>>> cap_path = 'task.agent.cap'
>>> table = await dataset.load(cap_path, schema = (CapInput, CapOutput))

async fetch(offset=0, limit=10000, *, cols=None, where=None, order_by=None, tag=None)

获取(下载)数据集中的所有数据, 默认按照 createTime 升序排列

  • Parameters:
    • offset (int) – 偏移量. 默认值为 0.
    • limit (int) – 限制数量. 默认值为 10000. 最大值为 10000
  • Returns: 数据对象的列表
  • Return type: list[RecordRef[I, O]]
  • Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
...     return {
...         "project_id": "1561",
...         "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
...         "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
...         "server_id": "4145",
...         "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
...     }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
...     label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
...     label: Literal[1, 2, 3]
...
>>> table_name = 'test_dataset'
>>> table = await dataset.load(table_name, schema = (CapInput, CapOutput))
>>> await table.fetch(offset=10, limit=100, order_by={"agentId": "asc", "dataId": "asc"})

async query(id=None, tag=None)

查询指定的数据

  • Parameters:
    • id (int | None) – 数据的 id
    • tag (TagFilter | None) – 数据的 tag 筛选条件
  • Returns: 数据对象的列表
  • Return type: list[RecordRef[I, O]]
  • Examples:
>>> from fuxi.aop.config import AOPConfig
>>> from fuxi.aop.runtime.dataset import TagFilter, Dataset
>>> def aop_config() -> AOPConfig:
...     return {
...         "project_id": "1561",
...         "key": "ecnarXSX3GRnsd7J2qAB2FOoix91dfZD",
...         "secret": "98YikpJVn7rndSzi28PYO6Y0iytCnxSN",
...         "server_id": "4145",
...         "platform_service_url": "https://project-manager-gateway-grpc.apps-sl.danlu.netease.com",
...     }
>>> dataset = await Dataset.init(aop_config())
>>> @dataclass
... class CapInput:
...     label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
...     label: Literal[1, 2, 3]
...
>>> table = await dataset.load('task.agent.cap', schema = (CapInput, CapOutput))
>>> await table.query(id=1)
>>> await table.query(tag=TagFilter(must=["tag1", "tag2"]))

async upload(data_list)

上传数据到数据集

  • Parameters: data_list (list [RecordData [**I , O ] ]) – 数据对象的列表
  • Returns: 是否上传成功
  • Return type: bool
  • Examples:
>>> @dataclass
... class CapInput:
...     label: Literal[1, 2, 3]
...
>>> @dataclass
... class CapOutput:
...     label: Literal[1, 2, 3]
...
>>> table_name = 'test_dataset'
>>> table = await dataset.load(table_name, schema = (CapInput, CapOutput))
>>> await table.upload([RecordData(CapInput(1), CapOutput(1))])

set_serializer(input_seri, output_seri)

设置序列化器

  • Parameters:
    • input_seri (Serializer [**Any ]) – cap input 序列化器
    • output_seri (Serializer [**Any ]) – cap output 序列化器

async load_view_history()

加载视图历史数据集

  • Returns: 视图历史数据集
  • Return type: DatasetViewHistoryDataset