🎄teratailクリスマスプレゼントキャンペーン2024🎄』開催中!

\teratail特別グッズやAmazonギフトカード最大2,000円分が当たる!/

詳細はこちら
Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

Q&A

解決済

3回答

1620閲覧

pythonでList内のSetを特定の条件で高速に結合したい

rinoguchi

総合スコア9

Python

Pythonは、コードの読みやすさが特徴的なプログラミング言語の1つです。 強い型付け、動的型付けに対応しており、後方互換性がないバージョン2系とバージョン3系が使用されています。 商用製品の開発にも無料で使用でき、OSだけでなく仮想環境にも対応。Unicodeによる文字列操作をサポートしているため、日本語処理も標準で可能です。

0グッド

2クリップ

投稿2019/12/18 07:25

編集2020/01/14 00:44

前提・実現したいこと

python v3.7.4で、List内のSetを特定の条件で高速に結合したいです。

具体的には、以下の様なListをインプットとして、
例) all_groups: List[Set[int]] = [{1, 2}, {2, 3}, {4, 5, 6}, {7, 8, 9}, {6, 8, 9}, {8, 10}, {11, 12}, {1, 2}, {2, 3}]

List内のSet同士を比較し、Set内に同一の数字が含まれる場合はSetを結合していき、最終的に
例) unified_groups: List[Set[int]] = [{1, 2, 3}, {4, 5, 6, 7, 8, 9, 10}, {11, 12}]
の様なリストを作成したいです。

実際には、all_groupsのサイズが数百万〜数千万件あり、メモリやCPUなどのリソースは潤沢という前提で、これをなるべく早く処理したいのですが、冴えた方法が思いつかず、良い方法があれば教えていただけると非常に助かります。

試したこと

昨日から、以下のコードで実行してみてるのですが、数千万件を処理すると1日経っても返ってこない状況です。

python

1from typing import List, Set 2 3def unify_group(all_groups: List[Set[int]]) -> List[Set[int]]: 4 unified_groups: List[Set[int]] = [] 5 for group in all_groups: 6 # 交差集合のリストを取得 7 detected_unified_groups: List[Set[int]] = [] 8 for unified_group in unified_groups: 9 if unified_group.isdisjoint(group) is False: 10 detected_unified_groups.append(unified_group) 11 12 if len(detected_unified_groups) == 0: 13 # 交差集合がなければ、groupをそのまま追加 14 unified_groups.append(group) 15 else: 16 # 交差集合があれば、既存グループを削除して、groupとdetected_unified_groupの和集合を追加 17 unified_group: Set[int] = set() 18 for detected_unified_group in detected_unified_groups: 19 unified_groups.remove(detected_unified_group) 20 unified_group = unified_group.union(detected_unified_group) 21 unified_groups.append(unified_group) 22 23 return unified_groups

気になる質問をクリップする

クリップした質問は、後からいつでもMYページで確認できます。

またクリップした質問に回答があった際、通知やメールを受け取ることができます。

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

LouiS0616

2019/12/18 07:59

要素は必ず整数値ですか?また、上限・下限は既知ですか?
rinoguchi

2019/12/18 08:15

確認ありがとうございます! 要素は必ず整数値で、上限・加減は既知です。 (下限は1で、上限は9000万ぐらいの値になります)
guest

回答3

0

union-findというデータ構造を使う案はどうでしょうか。
実装はココの物を借りました。

python

1class UnionFind(): 2 def __init__(self, n): 3 self.n = n 4 self.parents = [-1] * n 5 6 def find(self, x): 7 if self.parents[x] < 0: 8 return x 9 else: 10 self.parents[x] = self.find(self.parents[x]) 11 return self.parents[x] 12 13 def union(self, x, y): 14 x = self.find(x) 15 y = self.find(y) 16 17 if x == y: 18 return 19 20 if self.parents[x] > self.parents[y]: 21 x, y = y, x 22 23 self.parents[x] += self.parents[y] 24 self.parents[y] = x 25 26 def size(self, x): 27 return -self.parents[self.find(x)] 28 29 def same(self, x, y): 30 return self.find(x) == self.find(y) 31 32 def members(self, x): 33 root = self.find(x) 34 return [i for i in range(self.n) if self.find(i) == root] 35 36 def roots(self): 37 return [i for i, x in enumerate(self.parents) if x < 0] 38 39 def group_count(self): 40 return len(self.roots()) 41 42 def all_group_members(self): 43 return {r: self.members(r) for r in self.roots()} 44 45 def __str__(self): 46 return '\n'.join('{}: {}'.format(r, self.members(r)) for r in self.roots()) 47 48 49all_groups = [{1, 2}, {2, 3}, {4, 5, 6}, {7, 8, 9}, {6, 8, 9}, {8, 10}, {11, 12}, {1, 2}, {2, 3}] 50uf = UnionFind(13) # all_groupsに登場する最大値+1 51 52for a, *others in all_groups: 53 for b in others: 54 uf.union(a, b) 55print(uf) 56""" 570: [0] 581: [1, 2, 3] 598: [4, 5, 6, 7, 8, 9, 10] 6011: [11, 12] 61"""

投稿2019/12/18 09:06

退会済みユーザー

退会済みユーザー

総合スコア0

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

rinoguchi

2019/12/18 15:32

ご回答ありがとうございます! Union Find(素集合データ構造)と言うものがあるんですね。まさに今回のデータ構造そのものでした。 実際のデータで試して見たところ、unionするところまではすぐに終わるのですが、結果をList[Set[int]]の形で取得する処理を `unified_groups: List[Set[int]] = list(map(lambda r: set(uf.members(r)), uf.roots()))` こんな感じで実装したのですが、この部分にかなり時間がかかってしまいました。 お忙しいところご検討いただき、ありがとうございました。
guest

0

無駄があるけと・・・

py

1from collections import defaultdict 2 3all_groups = [{1, 2}, {2, 3}, {4, 5, 6}, {7, 8, 9}, {6, 8, 9}, {8, 10}, {11, 12}, {1, 2}, {2, 3}] 4 5d = {} 6for v in all_groups: 7 for k in v: 8 if k in d: 9 s = d[k] 10 s.update(v) 11 for c in v: 12 if c in d and s != d[c]: 13 s.update(d[c]) 14 break 15 else: 16 s = set(v) 17 for k in s: 18 d[k] = s 19 20print(list(map(set, set(map(tuple, d.values())))))

投稿2019/12/18 09:02

編集2019/12/18 09:18
shiracamus

総合スコア5406

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

rinoguchi

2019/12/18 15:10

ご回答ありがとうございます! 実データで試して見たところ、100万件=>100.7秒、200万件=>350.9秒で 自分の元の処理よりは圧倒的に速いものの、件数が増えると厳しくなりそうな感じでした。 お忙しいところ検討いただき、ありがとうございました。
guest

0

ベストアンサー

(追記)
もっと素直に組んでみました

python

1def unify_group2(all_groups: List[Set[int]]) -> List[Set[int]]: 2 val_to_index = {} 3 result = [] 4 5 for s in all_groups: 6 indices = {val_to_index.get(n, -1) for n in s} 7 if max(indices) == -1: 8 result.append(s) 9 index = len(result) - 1 10 for n in s: 11 val_to_index[n] = index 12 else: 13 indices = sorted(i for i in indices if i > -1) 14 index = indices[0] 15 for i in indices[1:]: 16 result[index] |= result[i] 17 for j in result[i]: 18 val_to_index[j] = index 19 # 空をセットしておいて後で取り除く 20 result[i] = {} 21 result[index] |= s 22 for n in s: 23 val_to_index[n] = index 24 25 return list(filter(None, result))

私だったら……

python

1from typing import Dict, List, Set 2 3 4def f(x: List[Set[int]]) -> List[Set[int]]: 5 result: List[Set[int]] = [] 6 value_to_index: Dict[int, int] = {} # 数値→resultのインデクス 7 8 for s in x: 9 for v in s: 10 # 既出の要素であれば value_to_index に登録されている 11 if v in value_to_index: 12 idx = value_to_index[v] 13 14 # 集合のマージ 15 result[idx] |= s 16 17 # 辞書に登録 18 for vv in s: 19 value_to_index[vv] = idx 20 break 21 else: 22 # ループが回りきった = 未知の要素しかない 23 24 # sのコピーをresultに追加 25 result.append(set(s)) 26 27 # 辞書に登録 28 idx = len(result) - 1 29 for v in s: 30 value_to_index[v] = idx 31 return result 32 33 34def unify_group(all_groups: List[Set[int]]) -> List[Set[int]]: 35 prev = [] 36 result = all_groups 37 while len(result) != len(prev): 38 prev = result 39 result = f(prev) 40 41 return result

既知の要素が出現したらマージする**だけ(↓)**のfをまず書いて、結果が収束するまで何度も何度も適用します。
(f[{1,2},{3},{2,3}][{1,2,3},{3}]にするだけ)

確認用コード

python

1def check(x): 2 from itertools import permutations 3 for s1, s2 in permutations(x, 2): 4 assert not (s1 & s2) 5 6 7def data(): 8 from random import randint, shuffle 9 x = [set([randint(1, 1000) for j in range(randint(50, 80))]) for i in range(20000)] 10 x += [set([randint(1000, 100_000) for j in range(randint(50, 80))]) for i in range(20000)] 11 x += [set([randint(100_000, 200_000) for j in range(randint(50, 80))]) for i in range(2000)] 12 x += [set([randint(200_001, 1_000_000) for j in range(randint(50, 80))]) for i in range(200)] 13 shuffle(x) 14 15 return x 16 17 18print( 19 unify_group( 20 [{1, 2}, {2, 3}, {4, 5, 6}, {7, 8, 9}, {6, 8, 9}, {8, 10}, {11, 12}, {1, 2}, {2, 3}] 21 ) 22) 23r = unify_group(data()) 24print(len(r)) 25check(r)

投稿2019/12/18 08:41

編集2019/12/18 10:34
quickquip

総合スコア11231

バッドをするには、ログインかつ

こちらの条件を満たす必要があります。

rinoguchi

2020/01/14 00:46 編集

短期間に二つも回答いただきありがとうございます! 詳しくコメントまでかいていただき、とても分かりやすかったです。 実データで検証して見たところ、 一つ目:100万件=>10.4秒、200万件=>13.2秒、1000万件=>39.2秒 二つ目:100万件=>1.1秒、200万件=>2.4秒、1000万件=>13.1秒 と尋常じゃない速度で処理完了し、結果も全く問題ありませんでした。 いただいた回答の中で最も速く、しかも件数が増加しても時間は線形に増えるだけなので、数千万件の本番データを扱う場合でも全然大丈夫そうです。ベストアンサーにさせていただきました。
guest

あなたの回答

tips

太字

斜体

打ち消し線

見出し

引用テキストの挿入

コードの挿入

リンクの挿入

リストの挿入

番号リストの挿入

表の挿入

水平線の挿入

プレビュー

15分調べてもわからないことは
teratailで質問しよう!

ただいまの回答率
85.36%

質問をまとめることで
思考を整理して素早く解決

テンプレート機能で
簡単に質問をまとめる

質問する

関連した質問