ADF - 数据流中的复杂嵌套数组

ADF - Complex nested array in Data Flow

提问人:Regazzi 提问时间:7/31/2023 更新时间:7/31/2023 访问量:192

问:

我在 ADF 中使用数据流有点新手,所以这就是我向您寻求帮助的原因。情况如下:

我想从数据流中的 REST API 转换 Json 文件,以创建表格/逻辑表结构。转换后,我想将此数据 + 结构发送到 Azure SQL 数据库。

以下是我所做的概述:

创建了一个管道,用于将数据从网站复制到我的数据湖。

Copy Activity

ADF Pipeline Sink

这是输入源:

{
    "id":[
        {
            "value":40051
        }
    ],
    "uuid":[
        {
            "value":"0ca12ac9-d94b-44cf-a35b-8b7256006cf8"
        }
    ],
    "revision_id":[
        {
            "value":1452381
        }
    ],
    "langcode":[
        {
            "value":"nl"
        }
    ],
    "type":[
        {
            "target_id":"par_chart",
            "target_type":"paragraphs_type",
            "target_uuid":"2c3143a2-bd78-4b4d-afb6-19160de928f2"
        }
    ],
    "status":[
        {
            "value":true
        }
    ],
    "created":[
        {
            "value":"2019-10-17T12:08:05+00:00",
            "format":"Y-m-d\\TH:i:sP"
        }
    ],
    "parent_id":[
        {
            "value":"2561"
        }
    ],
    "parent_type":[
        {
            "value":"node"
        }
    ],
    "parent_field_name":[
        {
            "value":"field_paragraphs"
        }
    ],
    "behavior_settings":[
        {
            "value":[
                
            ]
        }
    ],
    "default_langcode":[
        {
            "value":true
        }
    ],
    "revision_translation_affected":[
        {
            "value":true
        }
    ],
    "content_translation_source":[
        {
            "value":"und"
        }
    ],
    "content_translation_outdated":[
        {
            "value":false
        }
    ],
    "content_translation_changed":[
        {
            "value":"2023-05-09T09:48:38+00:00",
            "format":"Y-m-d\\TH:i:sP"
        }
    ],
    "field_par_chart":[
        {
            "csv":"[[\"\",\"Percentage behandeling\",\"Percentage behandeling\",\"Percentage behandeling\"],[\"2010\",\"19.4\",null,\"\"],[\"2011\",\"16.6\",null,\"\"],[\"2012\",\"15.4\",null,\"\"],[\"2013\",\"13.5\",null,\"\"],[\"2014\",\"13\",null,\"\"],[\"2015\",\"13\",null,\"\"],[\"2016\",\"14.1\",null,\"\"],[\"2017\",\"17.7\",null,\"\"],[\"2018\",\"24\",null,\"\"],[\"2019\",\"\",\"27.7\",\"\"],[\"2020\",\"\",\"31.9\",\"\"],[\"2021*\",\"\",\"\",\"42.9\"],[\"2022*\",null,\"\",\"46.2\"]]",
            "csv_url":"",
            "config":"{\"chart\":{\"type\":\"line\",\"renderTo\":{\"hcEvents\":{\"mousedown\":[{\"order\":null}],\"touchstart\":[{\"order\":null}],\"mouseover\":[{\"order\":null}],\"mouseout\":[{\"order\":null}]},\"__EV_STORE_KEY@7\":{}}},\"xAxis\":[{\"type\":\"category\",\"index\":0,\"isX\":true}],\"yAxis\":[{\"title\":{\"text\":\"Percentage\",\"offset\":-81.859375},\"min\":0,\"tickInterval\":5,\"index\":0,\"events\":{}}],\"templateName\":\"lineBasic\",\"series\":[{\"type\":null,\"animation\":false,\"data\":[{\"name\":\"2010\",\"y\":19.4},{\"name\":\"2011\",\"y\":16.6},{\"name\":\"2012\",\"y\":15.4},{\"name\":\"2013\",\"y\":13.5},{\"name\":\"2014\",\"y\":13},{\"name\":\"2015\",\"y\":13},{\"name\":\"2016\",\"y\":14.1},{\"name\":\"2017\",\"y\":17.7},{\"name\":\"2018\",\"y\":24},{\"name\":\"2019\",\"y\":null},{\"name\":\"2020\",\"y\":null},{\"name\":\"2021*\",\"y\":null},{\"name\":\"2022*\",\"y\":null}],\"name\":\"Percentage behandeling\",\"_colorIndex\":0,\"_symbolIndex\":0},{\"type\":\"line\",\"animation\":false,\"data\":[{\"name\":\"2010\",\"y\":null},{\"name\":\"2011\",\"y\":null},{\"name\":\"2012\",\"y\":null},{\"name\":\"2013\",\"y\":null},{\"name\":\"2014\",\"y\":null},{\"name\":\"2015\",\"y\":null},{\"name\":\"2016\",\"y\":null},{\"name\":\"2017\",\"y\":null},{\"name\":\"2018\",\"y\":null},{\"name\":\"2019\",\"y\":27.7},{\"name\":\"2020\",\"y\":31.9},{\"name\":\"2021*\",\"y\":null},{\"name\":\"2022*\",\"y\":null}],\"name\":\"Percentage behandeling\"},{\"type\":\"line\",\"animation\":false,\"data\":[{\"y\":null,\"name\":\"2010\"},{\"y\":null,\"name\":\"2011\"},{\"y\":null,\"name\":\"2012\"},{\"y\":null,\"name\":\"2013\"},{\"y\":null,\"name\":\"2014\"},{\"y\":null,\"name\":\"2015\"},{\"y\":null,\"name\":\"2016\"},{\"y\":null,\"name\":\"2017\"},{\"y\":null,\"name\":\"2018\"},{\"y\":null,\"name\":\"2019\"},{\"y\":null,\"name\":\"2020\"},{\"y\":42.9,\"name\":\"2021*\"},{\"y\":46.2,\"name\":\"2022*\"}],\"name\":\"Percentage behandeling\"}],\"title\":{\"text\":\"Trend in wachttijden voor behandeling in ziekenhuis langer dan de Treeknorm\"},\"legend\":{\"enabled\":false}}"
        }
    ],
    "field_par_extra_info":[
        {
            "value":"singlecard"
        }
    ],
    "field_par_hidden":[
        {
            "value":false
        }
    ],
    "field_par_text":[
        {
            "value":"<ul>\r\n\t<li><em>Door meerdere veranderingen van de bron en methode zijn de percentages van 2010-2018, 2019-2020 en 2021-2022 niet goed met elkaar te vergelijken.</em></li>\r\n\t<li><em><span><span><span>De wachttijden van 2021 zijn enkel gebaseerd op wachttijden gemeten van augustus&nbsp;2021 t/m december&nbsp;2021 vanwege een verandering in de methode. In de eerste helft van 2021 was 28,7% van de wachttijden voor behandeling langer dan de treeknorm.</span></span></span></em></li>\r\n\t<li><em>In 2020 brak de coronapandemie uit. Hiermee moet rekening worden gehouden bij het interpreteren van de trends.</em></li>\r\n</ul>\r\n\r\n<p><strong>Bron&nbsp;</strong>&nbsp;<br />\r\nWachttijdenonderzoek, Mediquest<br />\r\nWachttijdenregistratie NZa<br />\r\n<strong>Verslagjaar&nbsp; t/m</strong><br />\r\n2022<br />\r\n<strong>Laatste update gegevens&nbsp;</strong><br />\r\n24 mei 2023<br />\r\n<strong>Updatefrequentie&nbsp;</strong><br />\r\nJaarlijks<br />\r\n<strong>Meer info</strong><br />\r\n<a href=\"https://www.vzinfo.nl/prestatie-indicatoren/wachttijd-langer-dan-treeknorm-behandeling\" target=\"_blank\">Prestatie-indicatoren gezondheidszorg op VZinfo.nl</a></p>\r\n",
            "format":"volledige_html",
            "processed":"<ul><li><em>Door meerdere veranderingen van de bron en methode zijn de percentages van 2010-2018, 2019-2020 en 2021-2022 niet goed met elkaar te vergelijken.</em></li>\n\t<li><em><span><span><span>De wachttijden van 2021 zijn enkel gebaseerd op wachttijden gemeten van augustus 2021 t/m december 2021 vanwege een verandering in de methode. In de eerste helft van 2021 was 28,7% van de wachttijden voor behandeling langer dan de treeknorm.</span></span></span></em></li>\n\t<li><em>In 2020 brak de coronapandemie uit. Hiermee moet rekening worden gehouden bij het interpreteren van de trends.</em></li>\n</ul><p><strong>Bron </strong> <br />\nWachttijdenonderzoek, Mediquest<br />\nWachttijdenregistratie NZa<br /><strong>Verslagjaar  t/m</strong><br />\n2022<br /><strong>Laatste update gegevens </strong><br />\n24 mei 2023<br /><strong>Updatefrequentie </strong><br />\nJaarlijks<br /><strong>Meer info</strong><br /><a href=\"https://www.vzinfo.nl/prestatie-indicatoren/wachttijd-langer-dan-treeknorm-behandeling\" target=\"_blank\">Prestatie-indicatoren gezondheidszorg op VZinfo.nl</a></p>\n\n"
        }
    ],
    "field_par_text_bgcolor":[
        {
            "value":"bg-gray-lightest"
        }
    ],
    "field_par_text_position":[
        {
            "value":"below"
        }
    ],
    "field_par_title":[
        {
            "value":"Trend "
        }
    ],
    "field_par_title_class":[
        
    ],
    "field_par_title_enable":[
        {
            "value":false
        }
    ],
    "field_par_title_tag":[
        {
            "value":"h4"
        }
    ]
}

这是 ADF 管道的接收器部分:

它作为.json文件保存在数据湖中。

enter image description here

重要提示:我只需要转换 Json 元素的这一部分:field_par_chart与“csv”元素

enter image description here

预期的数据结构必须如下所示(见下图)并保存为文本文件。

enter image description here

如何使用 ADF 管道和/或 ADF 数据流执行此操作的任何建议?

非常感谢您的时间和精力!

数组 JSON 嵌套的 azure-data-factory google-cloud-dataflow

评论

0赞 Rakesh Govindula 7/31/2023
是否有任何限制,您应该只使用数据流,而不能使用任何复制活动或查找?
0赞 Regazzi 7/31/2023
@RakeshGovindula感谢您的回复。不,除了我的经验之外,没有任何限制
0赞 Rakesh Govindula 7/31/2023
"csv":"[[\"\",\"Percentage behandeling\",这里没有提到您的第一列名称。但是在您的预期输出中,它是有的。您的 JSON 在提供问题时是否遗漏了它,或者您想手动添加该列名称?Category
0赞 Regazzi 7/31/2023
@RakeshGovindula想要手动添加“类别”列。
1赞 Regazzi 8/3/2023
@RakeshGovindula非常感谢!

答:

1赞 Rakesh Govindula 7/31/2023 #1

如果目标列有限且列名称已知,则可以尝试以下方法。

在预期输出中,有 3 列具有相同的名称,ADF 或 SQL 不支持这些列。因此,我忽略了字段中的第一个数组(列名数组)。field_par_chart

由于您只需要字段中的数据,请使用选择转换从源中删除其余字段。field_par_chart

enter image description here

然后我进行了 3 个推导柱变换。

派生列1:

它拆分字符串并生成具有以下动态表达式的数组数组。

map(split(replace(field_par_chart_csv,'[',''),'],'),split(replace(replace(#item,'"',''),']',''),','))

enter image description here

派生列2:

由于列名相同,它跳过第一个子数组(列名数组),并使用此动态表达式展开所有子数组并将它们转换为如下所示的行。

unfold(slice(arr,2))

enter image description here

派生列3:

它从行数组生成所需的列。在这里,我手动给出列名并将值从字符串转换为双精度。您可以手动提供所需的任何列名称。

enter image description here

接收器中,提供目标 SQL 表,并仅提供这 4 列的映射,并删除我们从以前的转换中获得的额外列。

enter image description here

结果

enter image description here

通过管道执行此数据流,您可以将此数据加载到目标 SQL 表。

My Dataflow JSON 供您参考:

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "select1"
                },
                {
                    "name": "derivedColumn2"
                },
                {
                    "name": "derivedColumn3"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          id as (value as integer)[],",
                "          uuid as (value as string)[],",
                "          revision_id as (value as integer)[],",
                "          langcode as (value as string)[],",
                "          type as (target_id as string, target_type as string, target_uuid as string)[],",
                "          status as (value as boolean)[],",
                "          created as (value as string, format as string)[],",
                "          parent_id as (value as string)[],",
                "          parent_type as (value as string)[],",
                "          parent_field_name as (value as string)[],",
                "          behavior_settings as (value as string[])[],",
                "          default_langcode as (value as boolean)[],",
                "          revision_translation_affected as (value as boolean)[],",
                "          content_translation_source as (value as string)[],",
                "          content_translation_outdated as (value as boolean)[],",
                "          content_translation_changed as (value as string, format as string)[],",
                "          field_par_chart as (csv as string, csv_url as string, config as string)[],",
                "          field_par_extra_info as (value as string)[],",
                "          field_par_hidden as (value as boolean)[],",
                "          field_par_text as (value as string, format as string, processed as string)[],",
                "          field_par_text_bgcolor as (value as string)[],",
                "          field_par_text_position as (value as string)[],",
                "          field_par_title as (value as string)[],",
                "          field_par_title_class as string[],",
                "          field_par_title_enable as (value as boolean)[],",
                "          field_par_title_tag as (value as string)[]",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'singleDocument') ~> source1",
                "select1 derive(arr = map(split(replace(field_par_chart_csv,'[',''),'],'),split(replace(replace(#item,'\"',''),']',''),','))) ~> derivedColumn1",
                "source1 select(mapColumn(",
                "          field_par_chart_csv = field_par_chart[1].csv",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "derivedColumn1 derive(new_arr = unfold(slice(arr,2))) ~> derivedColumn2",
                "derivedColumn2 derive(Category = new_arr[1],",
                "          {Percentage behandeling1} = toDouble(new_arr[2]),",
                "          {Percentage behandeling2} = toDouble(new_arr[3]),",
                "          {Percentage behandeling3} = toDouble(new_arr[4])) ~> derivedColumn3",
                "derivedColumn3 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     recreate:true,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError',",
                "     mapColumn(",
                "          Category,",
                "          {Percentage behandeling1},",
                "          {Percentage behandeling2},",
                "          {Percentage behandeling3}",
                "     )) ~> sink1"
            ]
        }
    }
}

评论

0赞 Regazzi 8/4/2023
如果我从 Json 文件源转换和加载 3 列,您的解决方案就可以很好地工作。但是,如果我有一个有 4 列的 Json 文件源会发生什么?我是否需要在步骤“derived column3:”中手动添加第 4 列???我的愿望是在数据流中有一个通用步骤。有什么建议吗?
1赞 Rakesh Govindula 8/4/2023
无论列是什么,如果您知道列名,都可以手动添加它们,例如第 5 列。AFAIK,在这种 JSON 驻留在字符串中的情况下,ADF 中可能没有通用解决方案。new_arr[5]