Подтвердить что ты не робот

Программа Spark дает нечетные результаты при работе в автономном кластере

У меня есть эта искровая программа, и я попытаюсь ограничить ее только соответствующими частями

# Split by delimiter ,
# If the file is in unicode, we need to convert each value to a float in order to be able to 
# treat it as a number
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist()

# start with K randomly selected points from the dataset
# A centroid cannot be an actual data point or else the distance measure between a point and 
# that centroid will be zero. This leads to an undefined membership value into that centroid.
centroids = points.takeSample(False, K, 34)
#print centroids
# Initialize our new centroids
newCentroids = [[] for k in range(K)]
tempCentroids = []
for centroid in centroids:
    tempCentroids.append([centroid[N] + 0.5])
#centroids = sc.broadcast(tempCentroids)

convergence = False

ncm = NCM()

while(not convergence):
    memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m)))
    cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value)))
    # Memberships
    T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)))
    I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2,  weight3, c)[0]))
    F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1,  weight2, weight3, c)[0]))
    # Components of new centroids
    wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t))))
    #print "wTm = " + str(wTm.collect())
    print "at first reduce"
    sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2))
    #print "sumwTm = " + str(sumwTm.collect())
    wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t))))
    print "adding to cnumerator list"
    #print wTmx.collect()
    cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values()
    print "collected cnumerator, now printing"    
    #print "cnumerator = " + str(cnumerator.collect())
    #print str(sumwTm.collect())
    # Calculate the new centroids
    sumwTmCollection = sumwTm.collect()[0][1]
    cnumeratorCollection = cnumerator.collect()
    #print "sumwTmCollection = " + str(sumwTmCollection)
    #cnumeratorCollection =cnumerator.collectAsMap().get(0).items
    print "cnumeratorCollection = " + str(cnumeratorCollection)
    for i in range(len(newCentroids)):
        newCentroids[i] = scalarMult(1 / sumwTmCollection[i], [cnumeratorCollection[i]])
    centroids = newCentroids
    # Test for convergence
    convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon)

    #convergence = True 
    # Replace our old centroids with the newly found centroids and repeat if convergence not met
    # Clear out space for a new set of centroids
    newCentroids = [[] for k in range(K)]

Эта программа работает очень хорошо на моей локальной машине, однако она не ведет себя так, как ожидалось, при запуске в автономном кластере. Это не обязательно вызывает ошибку, но то, что она делает, дает другой результат, чем тот, который я получаю при работе на моей локальной машине. Кажется, что кластер и 3 узла работают нормально. У меня возникло ощущение, что я продолжаю обновлять centroids, который является списком python, и каждый раз он изменяется через while-loop. Возможно ли, что каждый node может не иметь самую последнюю копию этого списка? Я так думаю, поэтому я попытался использовать broadcast variable, но они не могут быть обновлены (только для чтения). Я также попытался использовать accumulator, но это только для накопления. Я также попытался сохранить списки python как файл в hdfs для каждого node, чтобы иметь доступ, но это не сработало. Как вы думаете, я правильно понимаю проблему? Может, здесь что-то еще происходит? Как я могу получить код, который отлично работает на моей локальной машине, но не в кластере?

4b9b3361

Ответ 1

Спасибо за все время и внимание к этой проблеме, тем более, что, похоже, я мог опубликовать дополнительную информацию, чтобы облегчить вашу работу. Проблема здесь в

centroids = points.takeSample(False, K, 34)

Я этого не осознавал, но после короткого эксперимента эта функция возвращает один и тот же вывод каждый раз, несмотря на то, что я считал случайным. Пока вы используете одно и то же семя (в этом случае 34), вы получите тот же RDD взамен. По какой-то причине RDD на моем кластере был иным, чем тот, который вернулся на мою локальную машину. В любом случае, поскольку это был тот же RDD каждый раз, мой вывод никогда не менялся. Проблема с "случайными" центроидами вернулась ко мне, так это то, что эти конкретные порождали нечто вроде седловой точки в математике, где не было бы конвергенции центроидов. Эта часть ответа является математической и программирующей, поэтому я не буду упоминать ее дальше. Моя настоящая надежда на этот момент заключается в том, что другим помогает идея, что если вы хотите

centroids = points.takeSample(False, K, 34)

для создания разных образцов каждый раз, когда он вызывается, вы каждый раз меняете свое семя на какое-то случайное число.

Надеюсь, все это поможет. Я никогда не тратил столько времени на решение моей памяти.

Еще раз спасибо.