开发指南

加入新特性

新的模型结构

  • 加入到xdpx/models/中,继承xdpx.models.Model类;

  • 如果是基于bert的模型,继承xdpx.models.bert.Bert。

  • 如果需要导入huggingface/transformers的相关内容,应当从模型中的离线版本导入,例如from xdpx.modules.thirdparty.transformers.modeling_bert import BertModel, BertConfig

  • 如果模型包含预训练embeddings,或用于对抗训练任务,需要实现Model类中的get_embeddings方法

新的损失函数/指标

  • 加入到xpdx/losses中,继承xdpx.losses.Loss类。

  • forward返回:

    • loss 后续用来backward的Tensor

    • sample_size:loss的由多少个样本组成,分布式训练时用于各个节点的加权;注意如果loss直接是所有target求出的loss的均值,那么这个值需要设置为1;如果loss是总和,那么这个值等于target的数量

    • logging_output:python native dict,注意所有tensor必须要.tolist()之后再传入

  • aggregate_logging_outputs

    • 输入

      • logging_outputs: List[Dict[str, Any]],所有分布式节点forward方法返回的logging_output的列表

      • sample_size:int,所有分布式节点forward方法返回的sample_size的和

    • 返回Dict[str, Union[float, int]],根据所有节点的输出计算出的整体的log指标,会进入日志文件;注意sample_size和ntokens是默认需要传出的项:

def aggregate_logging_outputs(self, logging_outputs, sample_size, max_count=None):
    """Aggregate logging outputs from data parallel training."""

    agg_output = {
        'sample_size': sample_size,
    }
    if 'ntokens' in logging_outputs[0]:
        ntokens = sum(log['ntokens'] for log in logging_outputs)
        agg_output['ntokens'] = ntokens
    # 其他需要计算的指标
    ...
    return agg_output

新的数据结构/格式

XDPX中的数据链路为

  • [预处理 Parser.parse_line->Loader.parse->(Loader.merge->)Processor.numerize]

  • -> [训练Processor.collate -> Model.forward->Loss.forward]

内部流通的数据格式示例:

  • 原始数据 取出每条样本[1]-> Parser.parse_line[2] -> [‘数据字段1’, ‘数据字段2’, ‘标签’]

  • Loader[4]

    • Loader.parse[3] -> {‘id’: 1234, ‘key1’: [‘数据’,’字段’,’1’], ‘key2’: [‘数据’,’字段’,’2’]}

    • Loader.parse_target -> {‘target’: ‘标签’}

  • Processor.numerize[5] -> {‘id’: 1234, ‘key1’: [10, 111, 65], ‘key2’: [10, 111, 66], ‘target’: 0}

  • Processor.collate[6] -> …

[1] 如果parser配置为json,则假设数据是一个json文件并且主体是一个list,每个元素是一条样本;否则假设数据是基于行的,每行一条样本。 [2] 现有的Parser选项(none/csv/json/jsonl)应当可以通过配置来适配主流的数据格式,其他格式可以继承xdpx.loaders.parsers.Parser类实现一个新的parser [3] def parse(cls, contents: List[str], _id) -> dict,这一步如果workers>1会在多进程内执行,通过cls.tokenize(text)可以调用Tokenizer,输出分词后的样本;如果这条样本不是有效的样本,可以在这一步return None把它过滤掉 [4] Loader如果有变动,还需要实现的方法:

  • 实现def length(self, sample: dict) -> int,以便统计数据中超出最大长度的比例

  • 如果数据有中标签的字段,实现num_sections和num_targets,分别为总字段数和标签占用的字段数,这样在预测无标签样本时,如果发现没有标签字段,则不会调用parse_target

  • 如果需要跨行整合,实现def merge(self, samples: List[dict]) -> List[dict]

[5] 把str类型的token、target等根据self.dictionary、self.target_map转为对应的int类型的id,并限制最大序列长度 [6] 输入一个batch: List[dict](里面每一个dict就是numerize的输出),做padding后给出模型的输入,以PairProcessor的collate方法为例:

def collate(self, samples: List[dict]):
    tokens1 = torch.LongTensor(self.pad([sample['tokens1'] for sample in samples]))
    tokens2 = torch.LongTensor(self.pad([sample['tokens2'] for sample in samples]))
    mask1 = torch.ne(tokens1, self.args.pad_index)
    mask2 = torch.ne(tokens2, self.args.pad_index)
    batch = {
        'id': [sample['id'] for sample in samples],
        'net_input': {
            'tokens1': tokens1,
            'tokens2': tokens2,
            'mask1': mask1,
            'mask2': mask2,
        },
        'ntokens': tokens1.numel() + tokens2.numel(),
    }
    try:
        target = torch.LongTensor([sample['target'] for sample in samples])
        batch.update({'target': target})
    except KeyError:
        ...
    return batch

其中’net_input’中的就是直接输入模型forward方法的数据:logits = model(**batch['net_input']),ntokens会用于训练速度的计算;’id’一般仅在debug或数据处理时使用,因此不需要转换成向量。

新的训练方式

  • 新的训练方式:继承xdpx.tasks.Task类,一般实现的是train_step和valid_step方法。

  • 新的数据采样方式:继承xdpx.datasets中的torch.utils.data.Dataset或者torch.utils.data.Sampler

注意:基类中的register方法,如果是staticmethod,那么继承后也应该使用staticmethod;如果是classmethod,那么继承后也应该使用classmethod。请参考其他已经实现的子类,如果他们调用了基类的注册方法super().register(options),那么你实现的子类也应该调用。

单元测试

运行所有单元测试:python -m unittest discover tests/unit_tests 运行单个单元测试:python -m unittest tests.unit_tests.test_xxx 运行全流程测试:bash tests/test_pipeline.sh 在PAI上运行全流程测试:bash pai.sh test 在PAI上运行benchmark测试:bash pai.sh benchmark 在本地执行代码覆盖率测试:

coverage erase
coverage run tests/test_legacy.py test
coverage combine
coverage html

报告导出在user/htmlcov/。 在PAI上执行代码覆盖率测试:bash pai.sh coverage oss://.../cov.zip

在新代码合并master之前,需要通过单元测试,并且在各个环境验证全流程测试是否通过、benchmark测试是否可以回归之前的结果。

新参数定义

TBD;这块文档写起来没有代码本身直观,可以参考已有的参数定义方式或参考xdpx.options中的源码。

已知的局限

  • AutoML在PAI上运行时由于每一个训练Snapshot都要存到云端,当前速度较慢;未来结合本地缓存+定期同步会大大加速;

  • Tensorflow仅支持1.x版本,暂不支持和2.x版本的互相转换。