提问人:deceze 提问时间:6/16/2017 最后编辑:deceze 更新时间:10/19/2017 访问量:880
区间树增加了子集匹配的维度?
Interval tree with added dimension of subset matching?
问:
这是一个关于一个有点复杂的问题的算法问题。其基础是:
基于可用时段和预留时段的调度系统。插槽有一定的标准,我们称它们为标签。如果可用槽的标记集是预留槽的超集,则这些标记会将预留槽与可用槽匹配。
举个具体的例子,以这个场景为例:
11:00 12:00 13:00
+--------+
| A, B |
+--------+
+--------+
| C, D |
+--------+
在 11:00 到 12:30 之间,可以预订标签,可以在 12:00 到 13:30 之间进行预订,并且从大约 12:00 到 12:30 有重叠。A
B
C
D
11:00 12:00 13:00
+--------+
| A, B |
+--------+
+--------+
| C, D |
+--------+
xxxxxx
x A x
xxxxxx
这里已经预订了,所以在11:15到12:00之间没有其他预订。A
A
B
简而言之,就是这个想法。可用插槽没有特定限制:
- 一个可用插槽可以包含任意数量的标签
- 任意数量的插槽可以随时重叠
- 插槽长度不限
- 预留可以包含任意数量的标记
系统中唯一需要遵守的规则是:
- 添加预留时,必须至少有一个剩余的可用时段与预留中的所有标签匹配
澄清一下:当同时有两个可用插槽时,例如,标签,那么当时可以进行两次预订,但不能再预订了。A
A
我正在使用区间树的修改实现;快速概述:
- 所有可用的插槽都将添加到间隔树中(保留重复/重叠)
- 所有保留插槽都会迭代,并且:
- 从树中查询与预订时间匹配的所有可用时段
- 第一个与预留标签匹配的标签将被切片,并将切片从树中删除
该过程完成后,剩下的是剩余的可用插槽切片,我可以查询是否可以在特定时间进行新的预订并添加它。
数据结构如下所示:
{
type: 'available',
begin: 1497857244,
end: 1497858244,
tags: [{ foo: 'bar' }, { baz: 42 }]
}
{
type: 'reserved',
begin: 1497857345,
end: 1497857210,
tags: [{ foo: 'bar' }]
}
标签本身就是键值对象,它们的列表就是一个“标签集”。如果有帮助,可以连载这些;到目前为止,我使用的是 Python 类型,这使得比较变得足够容易。槽开始/结束时间是树中的 UNIX 时间戳。我不是特别喜欢这些特定的数据结构,如果有用,我可以重构它们。set
我面临的问题是,这不能没有错误;每隔一段时间,一个预订就会潜入与其他预订冲突的系统,我还不知道这究竟是如何发生的。当标签以复杂的方式重叠时,这也不是很聪明,需要计算最佳分布,以便所有预留都可以尽可能地适应可用插槽;事实上,目前,在重叠方案中,如何将预留与可用时段匹配是不确定的。
我想知道的是:间隔树大多非常适合此目的,但是我目前添加标签集匹配作为附加维度的系统笨拙且固定不堪;有没有一种数据结构或算法可以优雅地处理这个问题?
必须支持的操作:
- 在系统中查询与某些标签集匹配的可用插槽(考虑到可能会降低可用性但本身不属于该标签集的预留;例如,在上面的示例中查询 )。
B
- 确保没有匹配可用插槽的系统无法添加任何预订。
答:
您的问题可以使用约束编程来解决。在 python 中,这可以使用 python-constraint 库来实现。
首先,我们需要一种方法来检查两个插槽是否彼此一致。这是一个函数,如果两个插槽共享一个标签并且它们的 rimeframe 重叠,则返回 true。在 python 中,这可以使用以下函数实现
def checkNoOverlap(slot1, slot2):
shareTags = False
for tag in slot1['tags']:
if tag in slot2['tags']:
shareTags = True
break
if not shareTags: return True
return not (slot2['begin'] <= slot1['begin'] <= slot2['end'] or
slot2['begin'] <= slot1['end'] <= slot2['end'])
我不确定你是希望标签完全相同(如 {foo: bar} 等于 {foo:bar})还是只希望键(如 {foo: bar} 等于 {foo:qux}),但您可以在上面的函数中更改它。
一致性检查
我们可以将 python-constraint 模块用于您请求的两种功能。
第二个功能是最简单的。为了实现这一点,我们可以使用将所提供数据结构中的插槽列表作为输入的函数。然后,该函数会将所有插槽提供给 python-constraint,并检查插槽列表是否一致(没有 2 个不应重叠、重叠的插槽)并返回一致性。isConsistent(set)
def isConsistent(set):
#initialize python-constraint context
problem = Problem()
#add all slots the context as variables with a singleton domain
for i in range(len(set)):
problem.addVariable(i, [set[i]])
#add a constraint for each possible pair of slots
for i in range(len(set)):
for j in range(len(set)):
#we don't want slots to be checked against themselves
if i == j:
continue
#this constraint uses the checkNoOverlap function
problem.addConstraint(lambda a,b: checkNoOverlap(a, b), (i, j))
# getSolutions returns all the possible combinations of domain elements
# because all domains are singleton, this either returns a list with length 1 (consistent) or 0 (inconsistent)
return not len(problem.getSolutions()) == 0
每当用户想要添加预留槽位时,都可以调用此函数。可以将输入插槽添加到现有插槽列表中,并且可以检查一致性。如果一致,则保留新插槽。否则,新插槽将重叠,应被拒绝。
查找可用插槽
这个问题有点棘手。我们可以使用与上述相同的功能,但进行一些重大更改。我们现在不想将新插槽与现有插槽一起添加,而是将所有可能的插槽添加到现有插槽中。然后,我们可以检查所有这些可能的插槽与保留插槽的一致性,并要求约束系统提供一致的组合。
因为如果我们不对它施加任何限制,可能的插槽数量将是无限的,所以我们首先需要为程序声明一些参数:
MIN = 149780000 #available time slots can never start earlier then this time
MAX = 149790000 #available time slots can never start later then this time
GRANULARITY = 1*60 #possible time slots are always at least one minut different from each other
我们现在可以继续 main 函数。它看起来很像一致性检查,但我们现在添加一个变量来发现所有可用的插槽,而不是来自用户的新插槽。
def availableSlots(tags, set):
#same as above
problem = Problem()
for i in range(len(set)):
problem.addVariable(i, [set[i]])
#add an extra variable for the available slot is added, with a domain of all possible slots
problem.addVariable(len(set), generatePossibleSlots(MIN, MAX, GRANULARITY, tags))
for i in range(len(set) +1):
for j in range(len(set) +1):
if i == j:
continue
problem.addConstraint(lambda a, b: checkNoOverlap(a, b), (i, j))
#extract the available time slots from the solution for clean output
return filterAvailableSlots(problem.getSolutions())
我使用一些辅助函数来保持代码更干净。它们都包含在这里。
def filterAvailableSlots(possibleCombinations):
result = []
for slots in possibleCombinations:
for key, slot in slots.items():
if slot['type'] == 'available':
result.append(slot)
return result
def generatePossibleSlots(min, max, granularity, tags):
possibilities = []
for i in range(min, max - 1, granularity):
for j in range(i + 1, max, granularity):
possibleSlot = {
'type': 'available',
'begin': i,
'end': j,
'tags': tags
}
possibilities.append(possibleSlot)
return tuple(possibilities)
您现在可以将函数 getAvailableSlots(tags, set) 与需要可用插槽的标签和一组已保留的插槽一起使用。请注意,此函数实际上返回所有一致的可能插槽,因此无需努力查找最大长度的插槽或其他优化插槽。
希望这有帮助!(我让它按照你在我的 pycharm 中描述的那样工作)
评论
Problem
checkNoOverlap
这里有一个解决方案,我将包含下面的所有代码。
1. 创建时段表和预订表
2. 创建预订 x 插槽矩阵
它由真值或假值填充,具体取决于预留槽组合是否可行
3. 找出允许最多预留槽位组合的最佳映射
注意:我目前的解决方案在非常大的数组中扩展性很差,因为它涉及遍历大小 = 插槽数的列表的所有可能排列。我发布了另一个问题,看看是否有人能找到更好的方法来做到这一点。但是,此解决方案是准确的,可以优化
Python 代码源
第 1 部分
from IPython.display import display
import pandas as pd
import datetime
available_data = [
['SlotA', datetime.time(11, 0, 0), datetime.time(12, 30, 0), set(list('ABD'))],
['SlotB',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('C'))],
['SlotC',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('ABCD'))],
['SlotD',datetime.time(12, 0, 0), datetime.time(13, 30, 0), set(list('AD'))],
]
reservation_data = [
['ReservationA', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('AD'))],
['ReservationB', datetime.time(11, 15, 0), datetime.time(12, 15, 0), set(list('A'))],
['ReservationC', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
['ReservationD', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('C'))],
['ReservationE', datetime.time(12, 0, 0), datetime.time(12, 15, 0), set(list('D'))]
]
reservations = pd.DataFrame(data=reservation_data, columns=['reservations', 'begin', 'end', 'tags']).set_index('reservations')
slots = pd.DataFrame(data=available_data, columns=['slots', 'begin', 'end', 'tags']).set_index('slots')
display(slots)
display(reservations)
第 2 部分
def is_possible_combination(r):
return (r['begin'] >= slots['begin']) & (r['end'] <= slots['end']) & (r['tags'] <= slots['tags'])
solution_matrix = reservations.apply(is_possible_combination, axis=1).astype(int)
display(solution_matrix)
第 3 部分
import numpy as np
from itertools import permutations
# add dummy columns to make the matrix square if it is not
sqr_matrix = solution_matrix
if sqr_matrix.shape[0] > sqr_matrix.shape[1]:
# uhoh, there are more reservations than slots... this can't be good
for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
sqr_matrix.loc[:,'FakeSlot' + str(i)] = [1] * sqr_matrix.shape[0]
elif sqr_matrix.shape[0] < sqr_matrix.shape[1]:
# there are more slots than customers, why doesn't anyone like us?
for i in range(sqr_matrix.shape[0] - sqr_matrix.shape[1]):
sqr_matrix.loc['FakeCustomer' + str(i)] = [1] * sqr_matrix.shape[1]
# we only want the values now
A = solution_matrix.values.astype(int)
# make an identity matrix (the perfect map)
imatrix = np.diag([1]*A.shape[0])
# randomly swap columns on the identity matrix until they match.
n = A.shape[0]
# this will hold the map that works the best
best_map_so_far = np.zeros([1,1])
for column_order in permutations(range(n)):
# this is an identity matrix with the columns swapped according to the permutation
imatrix = np.zeros(A.shape)
for row, column in enumerate(column_order):
imatrix[row,column] = 1
# is this map better than the previous best?
if sum(sum(imatrix * A)) > sum(sum(best_map_so_far)):
best_map_so_far = imatrix
# could it be? a perfect map??
if sum(sum(imatrix * A)) == n:
break
if sum(sum(imatrix * A)) != n:
print('a perfect map was not found')
output = pd.DataFrame(A*imatrix, columns=solution_matrix.columns, index=solution_matrix.index, dtype=int)
display(output)
评论
Arne 和 Tinker 建议的方法都有帮助,但最终还不够。我想出了一种混合方法,可以很好地解决这个问题。
主要问题是这是一个三维问题,很难同时解决所有维度的问题。这不仅仅是匹配时间重叠或标签重叠,而是将时间片与标签重叠相匹配。根据时间甚至标签将槽位与其他槽匹配非常简单,但将已经匹配的可用性槽位与另一个时间的另一个预留相匹配就非常复杂了。这意味着,在以下情况下,一个可用性可以涵盖不同时间的两个预订:
+---------+
| A, B |
+---------+
xxxxx xxxxx
x A x x A x
xxxxx xxxxx
试图将其融入到基于约束的编程中需要一种极其复杂的约束关系,这几乎是难以管理的。我的解决方案是简化问题......
删除一个维度
它不是一次解决所有维度,而是极大地简化了问题,在很大程度上消除了时间维度。我通过使用现有的间隔树并根据需要对其进行切片来做到这一点:
def __init__(self, slots):
self.tree = IntervalTree(slots)
def timeslot_is_available(self, start: datetime, end: datetime, attributes: set):
candidate = Slot(start.timestamp(), end.timestamp(), dict(type=SlotType.RESERVED, attributes=attributes))
slots = list(self.tree[start.timestamp():end.timestamp()])
return self.model_is_consistent(slots + [candidate])
为了查询特定时隙是否可用,我只取该特定时间相关的时隙(),这将计算的复杂性降低到局部子集:self.tree[..:..]
| | +-+ = availability
+-|------|-+ xxx = reservation
| +---|------+
xx|x xxx|x
| xxxx|
| |
然后我确认了这个狭窄切片内的一致性:
@staticmethod
def model_is_consistent(slots):
def can_handle(r):
return lambda a: r.attributes <= a.attributes and a.contains_interval(r)
av = [s for s in slots if s.type == SlotType.AVAILABLE]
rs = [s for s in slots if s.type == SlotType.RESERVED]
p = Problem()
p.addConstraint(AllDifferentConstraint())
p.addVariables(range(len(rs)), av)
for i, r in enumerate(rs):
p.addConstraint(can_handle(r), (i,))
return p.getSolution() is not None
(我在这里省略了一些优化和其他代码。
这部分是 Arne 和 tinker 建议的混合方法。它使用基于约束的编程来查找匹配的插槽,并使用 tinker 建议的矩阵算法。基本上:如果有任何解决方案可以将所有预留分配给不同的可用时段,则此时间片处于一致状态。由于我传递了所需的预留槽,如果模型(包括该槽)仍然一致,这意味着保留该槽是安全的。
如果在此狭窄的窗口内有两个可分配给同一可用性的短期预留,但这种情况的可能性很低,并且结果只是可用性查询的假阴性,这仍然存在问题;误报会更成问题。
查找可用插槽
查找所有可用插槽是一个更复杂的问题,因此再次需要进行一些简化。首先,只能查询模型中特定标记集的可用性(没有“给我所有全局可用的插槽”),其次只能使用特定的粒度(所需的插槽长度)进行查询。这非常适合我的特定用例,我只需要为用户提供他们可以保留的插槽列表,例如 9:15-9:30、9:30-9:45 等。通过重用上述代码,这使得算法变得非常简单:
def free_slots(self, start: datetime, end: datetime, attributes: set, granularity: timedelta):
slots = []
while start < end:
slot_end = start + granularity
if self.timeslot_is_available(start, slot_end, attributes):
slots.append((start, slot_end))
start += granularity
return slots
换句话说,它只是在给定的时间间隔内遍历所有可能的插槽,并从字面上检查该插槽是否可用。这有点蛮力解决方案,但效果很好。
评论