Clojure : Group-by 太慢(1300 万行文件)

Clojure : Group-by too slow (13 million-lines file)

提问人:Joseph Yourine 提问时间:2/1/2016 最后编辑:Joseph Yourine 更新时间:2/2/2016 访问量:768

问:

情况

我有一个 1300 万行的 CSV,我想对每个组执行逻辑回归 (incanter)。 我的文件就是这样(值只是示例)

ID Max Probability
1  1   0.5 
1  5   0.6
1  10  0.99
2  1   0.1
2  7   0.95

所以我首先用csv阅读器阅读它,everithing很好。

然后我有这样的东西:

( {"Id" "1", "Max" 1, "Probability" 0.5} {"Id" "1", "Max" 5, "Probability" 0.6} etc.

我想按 Id 对这些值进行分组,如果我没记错的话,大约有 12 百万个 Id。(我用 pandas 在 Python 中做到了,而且速度超快)

这是我读取和格式化文件的功能(它在较小的数据集上工作正常):

  (defn read-file
  []
    (let [path (:path-file @config)
          content-csv (take-csv path \,)]
      (->> (group-by :Id content-csv)
           (map (fn [[k v]]
                [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
           (into {}))))

我最终想要这样的东西来执行逻辑回归(我对此很灵活,不需要 :x 和 :y 的向量,seqs 就可以了)

{"1" {:x [1 5 10] :y [0.5 0.6 0.99]} "2" {:x [1 7] :y [0.1 0.95]} etc.

问题

我在分组操作中遇到了问题。我在 CSV 的输出上单独尝试了它,当它不会因为 Java 堆空间内存而消失时,这将永远花费。 我以为问题出在我的mapv上,但这是分组。

我考虑过使用reduce或reduce-kv,但我不知道如何将这些函数用于这种目的。

我不在乎“:x”和“:y”的顺序(只要它们之间相同,我的意思是 x 和 y 具有相同的索引......不是问题,因为它们在同一行上)和最终结果的 ID,我阅读了分组保持顺序。 也许这就是手术成本高昂的?

如果有人遇到过这种情况,我会给你样本数据:

(def sample '({"Id" "1" "Max" 1 "Probability" 0.5} {"Id" "1" "Max" 5 "Probability" 0.6} {"Id" "1" "Max" 10 "Probability" 0.99} {"Id" "2" "Max" 1 "Probability" 0.1} {"Id" "2" "Max" 7 "Probability" 0.95}))

其他选择

我有其他想法,但我不确定它们是否对“Clojure”友好。

  • 在 Python 中,由于函数的性质和文件已经排序,我没有使用 group-by,而是在数据帧中为每个组编写了开始和结束索引,这样我只需要直接选择子数据选项卡。

  • 我还可以加载 ID 列表,而不是从 Clojure 计算它。 喜欢

    (def ids '(“1”, “2” 等。

所以也许可以从以下方面开始:

{"1" {:x [] :y []} "2" {:x [] :y []} etc.

从上一个序列,然后匹配每个 ID 上的大文件。

我不知道它是否实际上更有效率。

我有所有其他逻辑回归函数,我只是缺少这部分! 谢谢!

编辑

感谢您的回答,我终于有了这个解决方案。

在我的project.clj文件中

 :jvm-opts ["-Xmx13g"])

法典:

(defn data-group->map [group]
  {(:Id (first group))
   {:x (map :Max group)
    :y (map :Probability group)}})


(defn prob-cumsum [data]
  (cag/fmap
    (fn [x]
      (assoc x :y (reductions + (x :y))))
  data))


(defn process-data-splitter [data]
  (->> (partition-by :Id data)
       (map data-group->map)
       (into {})
       (prob-cumsum)))

我打包了我所有的代码,它有效。拆分大约需要 5 分钟,但我不需要超高速。用于文件读取的内存使用量可以上升到所有内存,而 sigmoid 的内存使用量可以减少。

Clojure Group-by incanter

评论

1赞 Daniel Compton 2/1/2016
ids 的基数是高还是低?csv 中的 ID 是否按顺序排列?如果是这样,您可以在单次阅读 CSV 时进行分组。
0赞 Joseph Yourine 2/1/2016
您好,感谢您的回复。我有大约 1.2-130 万个 ID(比实际数据少 10 倍)。该文件的排序方式与我的例子类似,即:第一级 = ID,第二级 = 最大值(概率和最大值的顺序相同,因为它们由一条增长曲线链接)。所以也许你的想法很好,但我仍然不知道该怎么做。循环是个好主意吗?我认为它没有利用多处理的好处。我将通过重新格式化数据来尝试合并。

答:

6赞 leetwinski 2/1/2016 #1

如果您的文件按 ID 排序,则可以使用 代替 .partition-bygroup-by

那么你的代码将如下所示:

(defn data-group->map [group]
  [(:Id (first group))
   {:x (mapv :Max group)
    :y (mapv :Probability group)}])

(defn read-file []
  (let [path (:path-file @config)
        content-csv (take-csv path \,)]
    (->> content-csv
         (partition-by :Id)
         (map data-group->map)
         (into {}))))

这应该会加快速度。 然后,您可以使用换能器使其更快

(defn read-file []
  (let [path (:path-file @config)
        content-csv (take-csv path \,)]
    (into {} (comp (partition-by :Id)
                   (map data-group->map))
          content-csv)))

让我们做一些测试:

首先生成像您这样的海量数据:

(def huge-data
  (doall (mapcat #(repeat 
                     1000000
                     {:Id % :Max 1 :Probability 10})
           (range 10))))

我们有 1000 万个项目数据集,有 million of 、 million of 等等。{:Id 0 :Max 1 :Probability 10}{:Id 1 :Max 1 :Probability 10}

现在要测试的函数:

(defn process-data-group-by [data]
  (->> (group-by :Id data)
       (map (fn [[k v]]
              [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
       (into {})))

(defn process-data-partition-by [data]
  (->> data
       (partition-by :Id)
       (map data-group->map)
       (into {})))

(defn process-data-transducer [data]
  (into {} (comp (partition-by :Id) (map data-group->map)) data))

现在是时间测试:

(do (time (dorun (process-data-group-by huge-data)))
    (time (dorun (process-data-partition-by huge-data)))
    (time (dorun (process-data-transducer huge-data))))

"Elapsed time: 3377.167645 msecs"
"Elapsed time: 3707.03448 msecs"
"Elapsed time: 1462.955152 msecs"

请注意,这会产生惰性序列,而 group-by 应该实现整个集合。因此,如果您需要逐组而不是整个地图的数据,则可以更快地删除和访问每个数据:partition-by(into {})

(defn process-data-partition-by [data]
  (->> data
       (partition-by :Id)
       (map data-group->map)))

检查:

user> (time (def processed-data (process-data-partition-by huge-data)))
"Elapsed time: 0.06079 msecs"
#'user/processed-data
user> (time (let [f (first processed-data)]))
"Elapsed time: 302.200571 msecs"
nil
user> (time (let [f (second processed-data)]))
"Elapsed time: 500.597153 msecs"
nil
user> (time (let [f (last processed-data)]))
"Elapsed time: 2924.588625 msecs"
nil
user.core> (time (let [f (last processed-data)]))
"Elapsed time: 0.037646 msecs"
nil

评论

0赞 Joseph Yourine 2/1/2016
您好,感谢您的回答。我用你的样本数据尝试了你的解决方案,它的速度要快得多。使用我的 CSV,它非常慢。所以也许原因是用啜饮读取文件。我不知道如何解决它,但似乎分组不是真正的问题(即使我从你的帖子中学到了更好的解决方案)。但问题是我在使用 def 时遇到了 Java 堆空间问题,奇怪的是我有 16 个 Go 内存。
0赞 leetwinski 2/1/2016
你好!你如何加载和解析你的 css 文件?你能更新你的问题吗?
0赞 leetwinski 2/2/2016
java 堆空间的问题可能可以通过 jvm 调优来解决,方法是设置值。stackoverflow.com/questions/14763079/.......但真正的问题可能与您保留所有加载的数据(甚至是不需要的)有关。Xmx
0赞 Joseph Yourine 2/2/2016
是的,我会调查的。我真的看不出有什么不懒惰的,因为当单独应用时,操作似乎很快(甚至 csv 阅读器也返回一个懒惰的序列),但是当涉及到包装它们时,某处存在问题。很奇怪,因为我处理的是来自 Google Cloud 的大量数据,而且我没有遇到任何问题。
0赞 Joseph Yourine 2/2/2016
它终于奏效了,谢谢。我根据我的使用情况调整了您的代码并增加了 java 堆空间!编辑了我的帖子。