編程學習網 > 編程語言 > Python > python es教程(Python連接es更新操作)
2023
06-27

python es教程(Python連接es更新操作)


這一篇筆記介紹如何使用 Python 對數據進行更新操作。


對于 es 的更新的操作,不用到 Search() 方法,而是直接使用 es 的連接加上相應的函數來操作,本篇筆記目錄如下:

獲取連接
update()
update_by_query()
批量更新
UpdateByQuery()
1、獲取連接
如果使用的是之前的全局創建連接的方式:

from elasticsearch_dsl import connections
connections.configure(
    default={"hosts": "localhost:9200"},
)
我們可以根據別名獲取相應的連接:

conn = connections.connections.get_connection("default")
或者我們直接使用 elasticsearch.Elasticsearch 模塊來重新建立一個連接:

from elasticsearch import Elasticsearch

conn = Elasticsearch(hosts="localhost:9200")
前面介紹過,我們安裝 elasticsearch_dsl 依賴的時候,會自動為我們安裝上相應的 elasticsearch 模塊,我們這里直接使用即可。

然后通過 conn 連接可以直接對數據進行更新,可用的方法有 update(),update_by_query() 以及一個批量的 bulk() 方法。

2、update()
update() 函數一般只用于指定 id 的更新操作,如果我們知道一條數據的 id,我們可以直接使用 update()。

比如對于 exam 這個 index 下 id=18 的數據,我們想要更新它的 name 字段和 address 字段分別為 王五和湖南省,我們可以如下操作:

conn.update(
    index="exam",
    id=18,
    body={
        "doc": {
            "name": "王五2",
            "address": "湖南省",
        }
    }
)
在上面的操作中,index 為指定的索引,id 參數為我們需要更新的 id,body 內 doc 下的字段即為我們要更新的數據。

3、update_by_query()
update_by_query() 函數不局限于 id 的查詢更新,我們可以更新任意符合條件的數據,以下是一個簡單的示例:

conn.update_by_query(
    index="exam",
    body={
        "query": {
            "term": {"name":  "張三豐"}
        },
        "script": {
            "source": "ctx._source.address = params.address",
            "params": {
                "address": "新地址",
            }
        }
    }
)
在這里,index 參數還是指向對應的索引,body 內包含了需要更新查詢的條件,這里都在 query 參數內,需要更新的數據在 script 下,通過腳本的形式來操作更新。

這里注意下,我這里用到的是 7.6.0 版本,所以 script 下使用的 source,更低一點版本用的字段可能是 inline,這里使用對應版本的參數即可。

在 script.source 中,內容為 ctx._source.address = params.address,意思是將符合條件數據的 address 字段內容更新為 params 的 address 的數據。

如果想要更改其他字段內容,注意前面 ctx._source 為固定寫法,只需要更改后面的字段名即可。

在 script.params 中,我們則可以定義各種對應的字段及其內容。

更新多個字段
如果我們想同時更新多個字段,比如說符合條件的數據將 address 改為 新地址,將 age 字段改為 28,我們則需要將多個條件在 script.source 中使用分號 ; 連接起來,示例如下:

conn.update_by_query(
    index="exam",
    body={
        "query": {
            "term": {"name":  "新張三豐2"}
        },
        "script": {
            "source": "ctx._source.address = params.address; ctx._source.age = params.age",
            "params": {
                "address": "新地址3",
                "age": "28"
            }
        }
    }
)
雖然這里更新多個字段需要使用分號連接,但是在實際的代碼中我們不用這么寫死,比如說我們需要更改三個字段,為 ["address", "name", "age"],我們如下操作:

field_list = ["address", "name", "age"]
source_list = [f"ctx._source.{key}=params.{key}" for key in field_list]

params = {
    "address": "新地址3",
    "age": "28",
    "name": "new name"
}

conn.update_by_query(
    index="exam",
    body={
        "query": {
            "term": {"name":  "新張三豐3"}
        },
        "script": {
            "source": ";".join(source_list),
            "params": params
        }
    }
)
4、批量更新
如果我們想批量更新一批數據,這批數據各個字段的值都不一致,自定義的程度很大,使用 update_by_query() 函數已經不現實了,怎么辦?

好解決,我們可以使用 helpers.bulk() 批量更新方法。

首先引入這個模塊:

from elasticsearch import helpers
假設我們系統里現在有 id 為 21,23,24 的幾條數據,還是在 exam 這個索引下,我們來構造幾條需要更新的數據來操作:

action_1 = {
    "_op_type": "update",
    "_index": "exam",
    "_id": 21,
    "doc": {"age": 19, "name": "令狐沖", "address": "華山派"},
}

action_2 = {
    "_op_type": "update",
    "_index": "exam",
    "_id": 23,
    "doc": {"age": 20, "name": "楊過", "address": "終南山"},
}

action_3 = {
    "_op_type": "update",
    "_index": "exam",
    "_id": 24,
    "doc": {"age": 21, "name": "張無忌", "address": "武當"},
}
action_list = [action_1, action_2, action_3]
helpers.bulk(conn, actions=action_list)
對于每一條需要更新的數據,有這幾個參數:

_op_type:如果是更新操作,其值則是 update

_index:表示需要更新的數據所在的索引,這里是 exam

_id:表示這條需要更新的數據的 id

doc:是一個 dict 數據,其下包含了需要更新的字段及其對應的值

至此,一條需要更新的數據的結構就構造完畢了。

然后對于 helpers.bulk() 函數,接收的第一個參數為 es 連接,actions 參數是一個列表,其內容就是我們前面構造的數據的集合。

然后執行這個操作就可以發現 es 中對應的值已經更改了。

5、UpdateByQuery()
UpdateByQuery() 函數來源于 elasticsearch_dsl 模塊,它的使用和 Search() 方法差不多,都是通過 using 和 index 參數來獲取 es 連接和索引:

from elasticsearch_dsl import connections
from elasticsearch_dsl import UpdateByQuery
from elasticsearch_dsl import Q as ES_Q

connections.configure(
    default={"hosts": "localhost:9200"},
)


ubq = UpdateByQuery(using="default", index="exam")
使用這個方法更新數據的具體語法和 update_by_query 差不多,都是通過 script 的方式來操作,以下是一個簡單示例:

ubq = UpdateByQuery(using="default", index="exam")

q1 = ES_Q("term", name="郭靖")

ubq = ubq.query(q1)

ubq = ubq.script(
    source="ctx._source.address=params.address",
    params={
        "address": "襄陽城"
    }
)

ubq.execute()

與 Search() 函數一樣,都需要通過 execute() 函數來向 es 提交數據。

以上就是python es教程(Python連接es更新操作)的詳細內容,想要了解更多Python教程歡迎持續關注編程學習網。

掃碼二維碼 獲取免費視頻學習資料

Python編程學習

查 看2022高級編程視頻教程免費獲取