电商搜索店铺评分加权实践

背景

在电商平台的商品搜索排序中,我们不仅要考虑商品本身的相关性,还需要关注店铺的服务质量。店铺评分反映了商家在平台上的综合表现(通常以5分制表示),包括商品质量、服务态度、物流效率等因素。一些评分较低的店铺可能意味着用户体验不佳,为了提升搜索结果的质量,我们希望在排序时对店铺评分加权处理:高评分店铺的商品获得适当优势,低评分店铺的商品则适当降低排名。

举例来说,如果一个卖家店铺评分很低(例如低于2.5分),我们希望它的商品在搜索结果中不要占据靠前位置,即使这些商品本身的匹配度还不错。为此,我们计划对这类商品的最终得分扣除一定比例(例如5%),从而在排序中稍微降权。相应地,如果店铺评分较高,则维持正常排序(本次实现中高分店铺不额外加权,但预留了扩展的可能)。

方案设计

为实现上述目标,我们在搜索系统中新增了一个店铺评分相关的字段,并建立了一套数据同步和排序加权机制,整体方案如下:

  • 数据来源:店铺评分数据由平台的大数据团队离线计算产生,存储在独立的店铺得分表中(例如每日更新的店铺评分数据库表)。评分取值0~5分,小数点一位精度。为了让搜索系统及时获取更新,我们通过 消息队列(Kafka) 将评分变化推送到搜索系统,实现近实时同步。同时也保留批处理方案定期全量更新数据,确保数据一致性。
  • 索引扩展:在商品的搜索索引中新增“店铺评分”字段(例如store_score),用于存储每个商品所属店铺的评分。这个字段在建立索引时从店铺得分表获取,对于已有索引的数据通过批处理或增量更新方式补充。
  • 排序加权:搜索查询时,引擎会先根据文本相关度、商品属性匹配等计算基础分数。在二次排序阶段,引入店铺评分权重:如果商品所属店铺评分低于阈值(如2.5),则将该商品的基础得分乘以一个惩罚系数(如0.95,即降低5%);如果评分高于阈值则不受影响(系数为1)。阈值和惩罚系数均做成可配置,以便日后根据效果调优。

下面我们分别介绍数据接入、索引更新和排序加权的具体实现,并给出对应的python代码示例。

Kafka消息消费实现

为了实现店铺评分数据的实时同步,我们搭建了Kafka消费者来订阅评分更新主题。例如,大数据计算完成店铺评分后,将变化的店铺评分以消息形式发送到Kafka主题(如store-rating-topic)。搜索系统的消费模块订阅该主题,持续监听新消息并处理更新。

下面是Kafka消息消费的代码示例:

from kafka import KafkaConsumer

def consume_shop_ratings():
    consumer = KafkaConsumer(
        'shop-rating-updates',
        bootstrap_servers=['kafka1:9092', 'kafka2:9092'],
        auto_offset_reset='latest',
        group_id='shop-rating-group',
        enable_auto_commit=True,
        consumer_timeout_ms=1000
    )

    for message in consumer:
        shop_id, rating = message.value.decode('utf-8').split(",")
        rating = float(rating)
        update_shop_rating(shop_id, rating)

def update_shop_rating(shop_id, rating):
    # 示例更新缓存或数据库,这里用简单打印代替
    print(f"Shop {shop_id} rating updated to {rating}")

if __name__ == "__main__":
    consume_shop_ratings()

上述代码中,Kafka消费者订阅了store-rating-topic主题,不断轮询获取店铺评分更新消息。收到消息后,解析出店铺ID和新的评分值,随后调用updateStoreScore方法进行处理。这里我们可以将更新后的店铺评分暂存在内存缓存或队列中,然后异步触发对搜索索引中相关商品的更新(例如调用索引更新接口,更新这些商品的店铺评分字段)。

需要注意的是,实际应用中应考虑消费者的容错和效率:例如确保消费过程是幂等的(重复消息不会造成数据不一致),处理异常情况(Kafka连接中断重试等),以及对更新频率较高的店铺进行批量合并更新等优化。上述代码仅作为示例,省略了完整的错误处理和批量控制逻辑。

批处理店铺评分同步

除了实时消息,我们也实现了批处理方式来定期同步全量的店铺评分数据。这通常在以下情况下使用:初次上线该功能时需要给所有商品补充店铺评分字段,或者作为一种兜底方案定期纠正可能遗漏的更新。批处理通过直接读取店铺得分表(例如存储在MySQL或Hive中的离线计算结果)来获取所有店铺的评分,然后更新搜索索引。

批处理任务可以每日凌晨运行,执行以下步骤:

  1. 从店铺评分数据库表中读取所有店铺的当前评分。
  2. 将店铺ID和评分加载到内存(如Map结构)或者生成更新指令列表。
  3. 调用索引更新模块,将每个店铺对应商品的索引字段更新为最新评分。

下面是批量读取店铺评分数据的代码示例:

import pymysql

def fetch_shop_ratings():
    connection = pymysql.connect(
        host='your-db-host',
        user='your-username',
        password='your-password',
        database='shop_rating_db'
    )

    shop_rating_map = {}
    try:
        with connection.cursor() as cursor:
            cursor.execute("SELECT shop_id, rating FROM daily_shop_ratings")
            for shop_id, rating in cursor.fetchall():
                shop_rating_map[shop_id] = rating
    finally:
        connection.close()

    return shop_rating_map

if __name__ == "__main__":
    ratings = fetch_shop_ratings()
    print(f"Total shops loaded: {len(ratings)}")
    # 后续调用索引更新函数

上述批处理代码通过pymysql连接到数据库,执行SQL查询批量获取所有店铺的评分数据,并存入mysql。拿到全量数据后,接下来就可以遍历该Map,对搜索索引中的每个店铺相关商品执行更新(下面的索引更新模块会详细介绍)。批处理过程需要关注性能和资源占用,可以考虑分页读取或流式处理以避免一次性加载过多数据到内存。另外,应确保在批量更新索引时对搜索服务影响最小,可使用低峰期执行并合理控制提交频率。

索引字段扩展与更新

有了店铺评分数据,我们需要将其写入搜索索引,以便在查询排序时使用。首先,我们在索引架构中新增了一个字段,例如store_score,类型可以选择数值类型(如float)以存储评分值。对于已经存在的商品索引数据,需要进行一次更新以填充这个字段。我们采用两种方式更新索引:全量重建或增量更新。全量重建即重新索引所有商品数据时,将店铺评分一起写入(推荐在初次上线或数据量可控时采用)。增量更新则指对已有索引执行部分更新,这里可以利用前述Kafka消息或批处理结果,对变化的店铺逐一更新其商品文档。

为了演示索引更新,这里提供一个利用pysolr进行索引原子更新的代码示例。pysolr支持对已有文档的特定字段进行原子更新,而无需重建整个文档。假设我们已经获取了需要更新的店铺及其新的评分值,以及该店铺下所有商品的ID列表,我们可以如下更新Solr索引:

import pysolr

solr_url = 'http://your-solr-host:8983/solr/product_core'
solr_client = pysolr.Solr(solr_url, always_commit=True)

def update_shop_rating_index(shop_id, rating, product_ids):
    docs = []
    for pid in product_ids:
        doc = {
            "id": pid,
            "shop_rating": {"set": rating}
        }
        docs.append(doc)
    solr_client.add(docs)
    print(f"Updated shop {shop_id} products with rating {rating}")

if __name__ == "__main__":
    example_shop_id = 'shop789'
    example_rating = 4.2
    example_products = ['prod001', 'prod002', 'prod003']
    update_shop_rating_index(example_shop_id, example_rating, example_products)

上述代码通过pysolr连接到Solr服务器,然后构建字典来进行字段更新。

对于使用Elasticsearch等其他搜索引擎的情况,实现方式略有不同,但思路类似:先确保索引有店铺评分字段,然后批量更新文档的该字段值。无论采用何种搜索引擎,目的都是让每个商品文档都携带其店铺评分,方便后续的排序计算。

搜索排序加权实现

当索引中包含了店铺评分字段后,就可以在查询时应用我们的加权策略了。在搜索服务的二次排序阶段,我们获取每个结果商品的基础分值以及店铺评分,根据阈值判断是否需要降权。基础分值通常由多种相关度因素构成,例如商品与关键词的匹配度分数、商品是否属于用户偏好的品牌加分、品类匹配加分等,这里可以将这些分数加总得到一个初始得分。然后,引入店铺评分权重:如果店铺评分低于设定阈值,则乘以惩罚系数;反之则维持不变(系数为1)。

例如,我们设定店铺评分阈值为2.5分,惩罚系数为0.95(即扣除5%得分)。当某商品所属店铺评分为2.0时(低于2.5),其最终得分将 = 基础分 * 0.95;如果店铺评分为3.5(高于阈值),最终得分 = 基础分 * 1.0(不变)。通过这种方式,低评分店铺的商品得分略微降低,在整体排序中会略微靠后。

下面给出二次排序加权的简化代码示例:

THRESHOLD_RATING = 2.5
PENALTY_FACTOR = 0.95

def calculate_final_score(base_score, shop_rating):
    if shop_rating < THRESHOLD_RATING:
        return base_score * PENALTY_FACTOR
    return base_score

if __name__ == "__main__":
    products = [
        {"id": "prod001", "base_score": 80.0, "shop_rating": 2.0},
        {"id": "prod002", "base_score": 75.0, "shop_rating": 3.0},
        {"id": "prod003", "base_score": 90.0, "shop_rating": 4.5},
    ]

    for product in products:
        final_score = calculate_final_score(product["base_score"], product["shop_rating"])
        print(f"Product {product['id']} final score: {final_score}")

在真实的搜索排序代码中,我们会对每一条候选商品结果应用类似逻辑,计算其最终排序分数。通常这是在内存中完成的,可通过自定义比较器或者在排序评分公式中直接乘以权重实现。如果使用Solr/ES等引擎的功能,也可以考虑在查询时利用函数查询或脚本实现同样效果,例如Solr的if函数或ES的脚本评分(script score)将店铺评分作为条件纳入评分计算。但在我们的实现中,为了直观和灵活控制,我们选择在搜索服务层面进行计算。

通过上述加权策略,搜索结果会对低评分店铺的商品有所抑制,提升用户在前几页看到的商品整体质量。当然,具体的阈值和扣分比例可以根据实际效果调整。如果未来希望对高评分店铺商品进行加权提高分数,也可以引入类似机制(如评分高于4.5分的店铺商品提高若干百分比得分),以鼓励优质卖家。

总结

在本次实践中,我们针对电商搜索引擎引入了店铺评分维度的排序优化。通过数据同步+索引更新+排序调整的方案,成功实现了对低评分店铺商品的降权,提升了搜索结果的可靠性和用户体验。实施过程中,我们采用了Kafka实时消息和定时批处理相结合的方式确保数据及时准确,并在搜索索引中增加新字段以支持复杂的排序逻辑。整个改造遵循可配置、可扩展的原则,使运营团队能够根据业务需要调整阈值或权重比例。

上线后经过验证,店铺评分加权机制有效避免了少数低信誉店铺商品占据搜索前列的情况,搜索结果的点击率和转化率也有所提升。这一实践表明,在搜索排序中融入更多业务质量指标(如店铺评分)是提高结果相关性和用户满意度的重要手段。展望未来,我们可以进一步结合其他信号(如商家响应时间、库存周转率等)丰富排序策略,不断优化电商搜索的体验。

Ge Yuxu • AI & Engineering

脱敏说明:本文所有出现的表名、字段名、接口地址、变量名、IP地址及示例数据等均非真实,仅用于阐述技术思路与实现步骤,示例代码亦非公司真实代码。示例方案亦非公司真实完整方案,仅为本人记忆总结,用于技术学习探讨。
    • 文中所示任何标识符并不对应实际生产环境中的名称或编号。
    • 示例 SQL、脚本、代码及数据等均为演示用途,不含真实业务数据,也不具备直接运行或复现的完整上下文。
    • 读者若需在实际项目中参考本文方案,请结合自身业务场景及数据安全规范,使用符合内部命名和权限控制的配置。

Data Desensitization Notice: All table names, field names, API endpoints, variable names, IP addresses, and sample data appearing in this article are fictitious and intended solely to illustrate technical concepts and implementation steps. The sample code is not actual company code. The proposed solutions are not complete or actual company solutions but are summarized from the author's memory for technical learning and discussion.
    • Any identifiers shown in the text do not correspond to names or numbers in any actual production environment.
    • Sample SQL, scripts, code, and data are for demonstration purposes only, do not contain real business data, and lack the full context required for direct execution or reproduction.
    • Readers who wish to reference the solutions in this article for actual projects should adapt them to their own business scenarios and data security standards, using configurations that comply with internal naming and access control policies.

版权声明:本文版权归原作者所有,未经作者事先书面许可,任何单位或个人不得以任何方式复制、转载、摘编或用于商业用途。
    • 若需非商业性引用或转载本文内容,请务必注明出处并保持内容完整。
    • 对因商业使用、篡改或不当引用本文内容所产生的法律纠纷,作者保留追究法律责任的权利。

Copyright Notice: The copyright of this article belongs to the original author. Without prior written permission from the author, no entity or individual may copy, reproduce, excerpt, or use it for commercial purposes in any way.
    • For non-commercial citation or reproduction of this content, attribution must be given, and the integrity of the content must be maintained.
    • The author reserves the right to pursue legal action against any legal disputes arising from the commercial use, alteration, or improper citation of this article's content.

Copyright © 1989–Present Ge Yuxu. All Rights Reserved.